Ver Fonte

降低回调频率提升执行器性能

xuxueli há 7 anos atrás
pai
commit
d1d8d61c49

+ 2 - 1
doc/XXL-JOB官方文档.md

@@ -866,6 +866,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
 - 5、路由策略代码重构;
 - 6、执行器重复注册问题修复;
 - 7、任务线程轮空30次后自动销毁,降低低频任务的无效线程消耗。
+- 8、执行器任务执行结果批量回调,降低回调频率提升执行器性能;
 
 #### TODO LIST
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;
@@ -876,7 +877,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
 - 6、任务依赖,流程图,子任务+会签任务,各节点日志;
 - 7、调度任务优先级;
 - 8、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。
-
+- 9、任务执行结果回调失败后重试:待定,防止回调死循环;
 
 ## 七、其他
 

+ 11 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java

@@ -24,6 +24,7 @@ import org.springframework.web.bind.annotation.ResponseBody;
 import javax.annotation.Resource;
 import java.text.MessageFormat;
 import java.util.Date;
+import java.util.List;
 
 /**
  * Created by xuxueli on 17/5/10.
@@ -43,9 +44,18 @@ public class JobApiController {
     @RequestMapping(value= AdminApiUtil.CALLBACK, method = RequestMethod.POST, consumes = "application/json")
     @ResponseBody
     @PermessionLimit(limit=false)
-    public ReturnT<String> callback(@RequestBody HandleCallbackParam handleCallbackParam){
+    public ReturnT<String> callback(@RequestBody List<HandleCallbackParam> callbackParamList){
 
+        for (HandleCallbackParam handleCallbackParam: callbackParamList) {
+            ReturnT<String> callbackResult = callback(handleCallbackParam);
+            logger.info("JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
+                    (callbackResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
+        }
+
+        return ReturnT.SUCCESS;
+    }
 
+    private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
         // valid log item
         XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId());
         if (log == null) {

+ 12 - 3
xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java

@@ -6,6 +6,8 @@ import com.xxl.job.core.util.AdminApiUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
@@ -32,12 +34,19 @@ public class TriggerCallbackThread {
                     try {
                         HandleCallbackParam callback = getInstance().callBackQueue.take();
                         if (callback != null) {
-                            // callback
+
+                            // callback list
+                            List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
+                            int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
+                            callbackParamList.add(callback);
+
+                            // callback, will retry if error
                             try {
-                                ReturnT<String> callbackResult = AdminApiUtil.callApiFailover(AdminApiUtil.CALLBACK, callback);
-                                logger.info(">>>>>>>>>>> xxl-job callback, HandleCallbackParam:{}, callbackResult:{}", new Object[]{callback.toString(), callbackResult.toString()});
+                                ReturnT<String> callbackResult = AdminApiUtil.callApiFailover(AdminApiUtil.CALLBACK, callbackParamList);
+                                logger.info(">>>>>>>>>>> xxl-job callback, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
                             } catch (Exception e) {
                                 logger.error(">>>>>>>>>>> xxl-job TriggerCallbackThread Exception:", e);
+                                //getInstance().callBackQueue.addAll(callbackParamList);
                             }
                         }
                     } catch (Exception e) {