Browse Source

调度中心任务平均分配,触发组件每次获取与线程池数量相关数量的任务,避免大量任务集中在单个调度中心集群节点;

xuxueli 5 years ago
parent
commit
8b2731d2dd

+ 10 - 10
doc/XXL-JOB官方文档.md

@@ -1576,18 +1576,17 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
 - 9、任务触发组件加载顺序调整,避免小概率情况下组件随机加载顺序导致的I18N的NPE问题;
 - 10、项目依赖升级至较新稳定版本,如spring、spring-boot、mybatis、slf4j、groovy等等;
 - 11、JobThread自销毁优化,避免并发触发导致triggerQueue中任务丢失问题;
-- 12、[ING,交互兼容问题待处理]Cron在线生成工具:任务新增、编辑框通过组件在线生成Cron表达式;
+- 12、Cron在线生成工具:任务新增、编辑框通过组件在线生成Cron表达式;
 - 13、Cron下次执行时间查询:支持通过界面在线查看后续连续5次执行时间;
 - 14、任务重试时参数丢失的问题修复;
-- 15、[ING]xxl-rpc服务端线程优化,降低线程内存开销;
-- 16、[ING]调度日志优化:支持设置日志保留天数,过期日志天维度记录报表,并清理;调度报表汇总实时数据和报表;
-- 17、[ING]父子任务参数传递;流程任务等,透传动态参数;
-- 18、[ING]调度中心任务平均分配,触发组件每次获取与线程池数量相关数量的任务,避免大量任务集中在单个调度中心集群节点。
-- 19、调度中心密码限制18位,修复修改密码超过18位无法登陆的问题;
-- 20、[ING]调度中心日志删除,改为分页获取ID,根据ID删除的方式;
-- 21、[ING]任务回调改为restful方式;
-- 22、任务告警组件分页参数无效问题修复;
-- 23、DB脚本默认编码改为utf8mb4,修复字符乱码问题(建议Mysql版本5.7+);
+- 15、调度中心密码限制18位,修复修改密码超过18位无法登陆的问题;
+- 16、任务告警组件分页参数无效问题修复;
+- 17、DB脚本默认编码改为utf8mb4,修复字符乱码问题(建议Mysql版本5.7+);
+- 18、调度中心任务平均分配,触发组件每次获取与线程池数量相关数量的任务,避免大量任务集中在单个调度中心集群节点;
+- 19、[ING]xxl-rpc服务端线程优化,降低线程内存开销;
+- 20、[ING]调度日志优化:支持设置日志保留天数,过期日志天维度记录报表,并清理;调度报表汇总实时数据和报表;
+- 21、[ING]调度中心日志删除,改为分页获取ID,根据ID删除的方式;
+- 22、[ING]任务回调改为restful方式;
 
 
 ### TODO LIST
@@ -1625,6 +1624,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
 - 32、AccessToken按照执行器维度设置;控制调度、回调;
 - 33、任务执行一次的时候指定IP;
 - 34、通讯调整;双向HTTP,回调和其他API自定义AccessToken,Restful,执行器复用容器端口;
+- 35、父子任务参数传递;流程任务等,透传动态参数;
 
 
 ## 七、其他

+ 20 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java

@@ -60,6 +60,12 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
     @Value("${spring.mail.username}")
     private String emailUserName;
 
+    @Value("${xxl.job.triggerpool.fast.max}")
+    private int triggerPoolFastMax;
+
+    @Value("${xxl.job.triggerpool.slow.max}")
+    private int triggerPoolSlowMax;
+
     // dao, service
 
     @Resource
@@ -90,6 +96,20 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
         return emailUserName;
     }
 
+    public int getTriggerPoolFastMax() {
+        if (triggerPoolFastMax < 200) {
+            return 200;
+        }
+        return triggerPoolFastMax;
+    }
+
+    public int getTriggerPoolSlowMax() {
+        if (triggerPoolSlowMax < 100) {
+            return 100;
+        }
+        return triggerPoolSlowMax;
+    }
+
     public XxlJobLogDao getXxlJobLogDao() {
         return xxlJobLogDao;
     }

+ 8 - 5
xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java

@@ -39,14 +39,17 @@ public class XxlJobScheduler  {
         // init i18n
         initI18n();
 
+        // admin-server
+        initRpcProvider();
+
         // admin registry monitor run
         JobRegistryMonitorHelper.getInstance().start();
 
         // admin monitor run
         JobFailMonitorHelper.getInstance().start();
 
-        // admin-server
-        initRpcProvider();
+        // admin trigger pool start
+        JobTriggerPoolHelper.toStart();
 
         // start-schedule
         JobScheduleHelper.getInstance().start();
@@ -63,12 +66,12 @@ public class XxlJobScheduler  {
         // admin trigger pool stop
         JobTriggerPoolHelper.toStop();
 
-        // admin registry stop
-        JobRegistryMonitorHelper.getInstance().toStop();
-
         // admin monitor stop
         JobFailMonitorHelper.getInstance().toStop();
 
+        // admin registry stop
+        JobRegistryMonitorHelper.getInstance().toStop();
+
         // admin-server
         stopRpcProvider();
     }

+ 4 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java

@@ -50,6 +50,9 @@ public class JobScheduleHelper {
                 }
                 logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
 
+                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
+                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
+
                 while (!scheduleThreadToStop) {
 
                     // Scan Job
@@ -73,7 +76,7 @@ public class JobScheduleHelper {
 
                         // 1、pre read
                         long nowTime = System.currentTimeMillis();
-                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS);
+                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                         if (scheduleList!=null && scheduleList.size()>0) {
                             // 2、push time-ring
                             for (XxlJobInfo jobInfo: scheduleList) {

+ 47 - 35
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java

@@ -1,5 +1,6 @@
 package com.xxl.job.admin.core.thread;
 
+import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
 import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
 import com.xxl.job.admin.core.trigger.XxlJobTrigger;
 import org.slf4j.Logger;
@@ -20,31 +21,44 @@ public class JobTriggerPoolHelper {
     // ---------------------- trigger pool ----------------------
 
     // fast/slow thread pool
-    private ThreadPoolExecutor fastTriggerPool = new ThreadPoolExecutor(
-            50,
-            200,
-            60L,
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<Runnable>(1000),
-            new ThreadFactory() {
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
-                }
-            });
-
-    private ThreadPoolExecutor slowTriggerPool = new ThreadPoolExecutor(
-            10,
-            100,
-            60L,
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<Runnable>(2000),
-            new ThreadFactory() {
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
-                }
-            });
+    private ThreadPoolExecutor fastTriggerPool = null;
+    private ThreadPoolExecutor slowTriggerPool = null;
+
+    public void start(){
+        fastTriggerPool = new ThreadPoolExecutor(
+                10,
+                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
+                60L,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(1000),
+                new ThreadFactory() {
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
+                    }
+                });
+
+        slowTriggerPool = new ThreadPoolExecutor(
+                10,
+                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
+                60L,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(2000),
+                new ThreadFactory() {
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
+                    }
+                });
+    }
+
+
+    public void stop() {
+        //triggerPool.shutdown();
+        fastTriggerPool.shutdownNow();
+        slowTriggerPool.shutdownNow();
+        logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
+    }
 
 
     // job timeout count
@@ -100,17 +114,19 @@ public class JobTriggerPoolHelper {
         });
     }
 
-    public void stop() {
-        //triggerPool.shutdown();
-        fastTriggerPool.shutdownNow();
-        slowTriggerPool.shutdownNow();
-        logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
-    }
+
 
     // ---------------------- helper ----------------------
 
     private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
 
+    public static void toStart() {
+        helper.start();
+    }
+    public static void toStop() {
+        helper.stop();
+    }
+
     /**
      * @param jobId
      * @param triggerType
@@ -126,8 +142,4 @@ public class JobTriggerPoolHelper {
         helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
     }
 
-    public static void toStop() {
-        helper.stop();
-    }
-
 }

+ 1 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java

@@ -41,7 +41,7 @@ public interface XxlJobInfoDao {
 
 	public int findAllCount();
 
-	public List<XxlJobInfo> scheduleJobQuery(@Param("maxNextTime") long maxNextTime);
+	public List<XxlJobInfo> scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize );
 
 	public int scheduleUpdate(XxlJobInfo xxlJobInfo);
 

+ 4 - 0
xxl-job-admin/src/main/resources/application.properties

@@ -44,3 +44,7 @@ xxl.job.accessToken=
 
 ### xxl-job, i18n (default empty as chinese, "en" as english)
 xxl.job.i18n=
+
+## xxl-job, triggerpool max size
+xxl.job.triggerpool.fast.max=200
+xxl.job.triggerpool.slow.max=100

+ 2 - 0
xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml

@@ -213,6 +213,8 @@
 		FROM xxl_job_info AS t
 		WHERE t.trigger_status = 1
 			and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
+		ORDER BY id ASC
+		LIMIT #{pagesize}
 	</select>
 
 	<update id="scheduleUpdate" parameterType="com.xxl.job.admin.core.model.XxlJobInfo"  >