xuxueli 5 éve
szülő
commit
18162e75c2

+ 56 - 31
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java

@@ -25,6 +25,8 @@ public class JobScheduleHelper {
         return instance;
     }
 
+    public static final long PRE_READ_MS = 5000;    // pre read
+
     private Thread scheduleThread;
     private Thread ringThread;
     private volatile boolean scheduleThreadToStop = false;
@@ -47,11 +49,11 @@ public class JobScheduleHelper {
                 }
                 logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
 
+                Connection conn = null;
                 while (!scheduleThreadToStop) {
 
                     // 扫描任务
                     long start = System.currentTimeMillis();
-                    Connection conn = null;
                     PreparedStatement preparedStatement = null;
                     try {
                         if (conn==null || conn.isClosed()) {
@@ -65,16 +67,16 @@ public class JobScheduleHelper {
                         // tx start
 
                         // 1、预读5s内调度任务
-                        long maxNextTime = System.currentTimeMillis() + 5000;
-                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(maxNextTime);
+                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(System.currentTimeMillis() + PRE_READ_MS);
                         if (scheduleList!=null && scheduleList.size()>0) {
                             // 2、推送时间轮
                             for (XxlJobInfo jobInfo: scheduleList) {
 
                                 // 时间轮刻度计算
-                                if (System.currentTimeMillis() > jobInfo.getTriggerNextTime() + 5000) {
+                                if (System.currentTimeMillis() > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                     // 过期超5s:本地忽略,当前时间开始计算下次触发时间
 
+                                    // fresh next
                                     jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                     jobInfo.setTriggerNextTime(
                                             new CronExpression(jobInfo.getJobCron())
@@ -82,44 +84,55 @@ public class JobScheduleHelper {
                                                     .getTime()
                                     );
 
-                                    // pass
-                                    continue;
-
                                 } else if (System.currentTimeMillis() > jobInfo.getTriggerNextTime()) {
-                                    // 过期5s内 :立即触发一次,当前时间开始计算下次触发时间
+                                    // 过期5s内 :立即触发一次,当前时间开始计算下次触发时间;一旦过期,预读一次;
 
-                                    jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
-                                    jobInfo.setTriggerNextTime(
-                                            new CronExpression(jobInfo.getJobCron())
-                                                    .getNextValidTimeAfter(new Date())
-                                                    .getTime()
-                                    );
+                                    CronExpression cronExpression = new CronExpression(jobInfo.getJobCron());
+                                    long nextTime = cronExpression.getNextValidTimeAfter(new Date()).getTime();
 
-                                    // do trigger
+                                    // 1、trigger
                                     JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
 
+                                    // 2、fresh next
+                                    jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
+                                    jobInfo.setTriggerNextTime(nextTime);
+
+                                    // 3、check pre read
+                                    if (jobInfo.getTriggerNextTime() - System.currentTimeMillis() < PRE_READ_MS) {
+
+                                        // 1、make ring second
+                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
+
+                                        // 2、push time ring
+                                        pushTimeRing(ringSecond, jobInfo.getId());
+
+                                        // 3、fresh next
+                                        jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
+                                        jobInfo.setTriggerNextTime(
+                                                new CronExpression(jobInfo.getJobCron())
+                                                        .getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()))
+                                                        .getTime()
+                                        );
+
+                                    }
+
                                     logger.debug(">>>>>>>>>>> xxl-job, push trigger : jobId = " + jobInfo.getId() );
                                 } else {
                                     // 未过期:正常触发,递增计算下次触发时间
 
+                                    // 1、make ring second
                                     int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
 
+                                    // 2、push time ring
+                                    pushTimeRing(ringSecond, jobInfo.getId());
+
+                                    // 3、fresh next
                                     jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                     jobInfo.setTriggerNextTime(
                                             new CronExpression(jobInfo.getJobCron())
                                                     .getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()))
                                                     .getTime()
                                     );
-
-                                    // push async ring
-                                    List<Integer> ringItemData = ringData.get(ringSecond);
-                                    if (ringItemData == null) {
-                                        ringItemData = new ArrayList<Integer>();
-                                        ringData.put(ringSecond, ringItemData);
-                                    }
-                                    ringItemData.add(jobInfo.getId());
-
-                                    logger.debug(">>>>>>>>>>> xxl-job, push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
                                 }
 
                             }
@@ -139,12 +152,6 @@ public class JobScheduleHelper {
                             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                         }
                     } finally {
-                        if (conn != null) {
-                            try {
-                                conn.close();
-                            } catch (SQLException e) {
-                            }
-                        }
                         if (null != preparedStatement) {
                             try {
                                 preparedStatement.close();
@@ -166,6 +173,12 @@ public class JobScheduleHelper {
                     }
 
                 }
+                if (conn != null) {
+                    try {
+                        conn.close();
+                    } catch (SQLException e) {
+                    }
+                }
                 logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
             }
         });
@@ -235,6 +248,18 @@ public class JobScheduleHelper {
         ringThread.start();
     }
 
+    private void pushTimeRing(int ringSecond, int jobId){
+        // push async ring
+        List<Integer> ringItemData = ringData.get(ringSecond);
+        if (ringItemData == null) {
+            ringItemData = new ArrayList<Integer>();
+            ringData.put(ringSecond, ringItemData);
+        }
+        ringItemData.add(jobId);
+
+        logger.debug(">>>>>>>>>>> xxl-job, push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
+    }
+
     public void toStop(){
 
         // 1、stop schedule

+ 3 - 2
xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java

@@ -4,6 +4,7 @@ import com.xxl.job.admin.core.model.XxlJobGroup;
 import com.xxl.job.admin.core.model.XxlJobInfo;
 import com.xxl.job.admin.core.cron.CronExpression;
 import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
+import com.xxl.job.admin.core.thread.JobScheduleHelper;
 import com.xxl.job.admin.core.util.I18nUtil;
 import com.xxl.job.admin.dao.XxlJobGroupDao;
 import com.xxl.job.admin.dao.XxlJobInfoDao;
@@ -193,7 +194,7 @@ public class XxlJobServiceImpl implements XxlJobService {
 		long nextTriggerTime = exists_jobInfo.getTriggerNextTime();
 		if (exists_jobInfo.getTriggerStatus() == 1 && !jobInfo.getJobCron().equals(exists_jobInfo.getJobCron()) ) {
 			try {
-				nextTriggerTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + 5000)).getTime();
+				nextTriggerTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS)).getTime();
 			} catch (ParseException e) {
 				logger.error(e.getMessage(), e);
 				return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_unvalid")+" | "+ e.getMessage());
@@ -239,7 +240,7 @@ public class XxlJobServiceImpl implements XxlJobService {
 		// next trigger time (5s后生效,避开预读周期)
 		long nextTriggerTime = 0;
 		try {
-			nextTriggerTime = new CronExpression(xxlJobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + 5000)).getTime();
+			nextTriggerTime = new CronExpression(xxlJobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS)).getTime();
 		} catch (ParseException e) {
 			logger.error(e.getMessage(), e);
 			return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_unvalid")+" | "+ e.getMessage());