Browse Source

调度组件销毁流程优化,先停止调度线程,然后等待时间轮内存量任务处理完成,最终销毁时间轮线程;

xuxueli 5 years ago
parent
commit
293ffca14d

+ 1 - 0
doc/XXL-JOB官方文档.md

@@ -1477,6 +1477,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
 - 11、升级xxl-rpc至较新版本,修复代理服务初始化时远程服务不可用导致长连冗余创建的问题;
 - 12、首页调度报表的日期排序在TIDB下乱序问题修复;
 - 13、调度中心与执行器双向通讯超时时间调整为3s;
+- 14、调度组件销毁流程优化,先停止调度线程,然后等待时间轮内存量任务处理完成,最终销毁时间轮线程;
 
 
 ### 6.26 版本 v2.1.1 Release Notes[规划中]

+ 55 - 16
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java

@@ -27,7 +27,8 @@ public class JobScheduleHelper {
 
     private Thread scheduleThread;
     private Thread ringThread;
-    private volatile boolean toStop = false;
+    private volatile boolean scheduleThreadToStop = false;
+    private volatile boolean ringThreadToStop = false;
     private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
 
     public void start(){
@@ -40,13 +41,13 @@ public class JobScheduleHelper {
                 try {
                     TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                 } catch (InterruptedException e) {
-                    if (!toStop) {
+                    if (!scheduleThreadToStop) {
                         logger.error(e.getMessage(), e);
                     }
                 }
                 logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
 
-                while (!toStop) {
+                while (!scheduleThreadToStop) {
 
                     // 扫描任务
                     long start = System.currentTimeMillis();
@@ -127,7 +128,7 @@ public class JobScheduleHelper {
 
                         conn.commit();
                     } catch (Exception e) {
-                        if (!toStop) {
+                        if (!scheduleThreadToStop) {
                             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                         }
                     } finally {
@@ -152,7 +153,7 @@ public class JobScheduleHelper {
                             TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
                         }
                     } catch (InterruptedException e) {
-                        if (!toStop) {
+                        if (!scheduleThreadToStop) {
                             logger.error(e.getMessage(), e);
                         }
                     }
@@ -175,13 +176,13 @@ public class JobScheduleHelper {
                 try {
                     TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
                 } catch (InterruptedException e) {
-                    if (!toStop) {
+                    if (!ringThreadToStop) {
                         logger.error(e.getMessage(), e);
                     }
                 }
 
                 int lastSecond = -1;
-                while (!toStop) {
+                while (!ringThreadToStop) {
 
                     try {
                         // second data
@@ -216,7 +217,7 @@ public class JobScheduleHelper {
                             ringItemData.clear();
                         }
                     } catch (Exception e) {
-                        if (!toStop) {
+                        if (!ringThreadToStop) {
                             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                         }
                     }
@@ -225,7 +226,7 @@ public class JobScheduleHelper {
                     try {
                         TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
                     } catch (InterruptedException e) {
-                        if (!toStop) {
+                        if (!ringThreadToStop) {
                             logger.error(e.getMessage(), e);
                         }
                     }
@@ -239,23 +240,61 @@ public class JobScheduleHelper {
     }
 
     public void toStop(){
-        toStop = true;
 
-        // interrupt and wait
-        scheduleThread.interrupt();
+        // 1、stop schedule
+        scheduleThreadToStop = true;
         try {
-            scheduleThread.join();
+            TimeUnit.SECONDS.sleep(1);  // wait
         } catch (InterruptedException e) {
             logger.error(e.getMessage(), e);
         }
+        if (scheduleThread.getState() != Thread.State.TERMINATED){
+            // interrupt and wait
+            scheduleThread.interrupt();
+            try {
+                scheduleThread.join();
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
 
-        // interrupt and wait
-        ringThread.interrupt();
+        // if has ring data
+        boolean hasRingData = false;
+        if (!ringData.isEmpty()) {
+            for (int second : ringData.keySet()) {
+                List<Integer> tmpData = ringData.get(second);
+                if (tmpData!=null && tmpData.size()>0) {
+                    hasRingData = true;
+                    break;
+                }
+            }
+        }
+        if (hasRingData) {
+            try {
+                TimeUnit.SECONDS.sleep(8);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        // stop ring (wait job-in-memory stop)
+        ringThreadToStop = true;
         try {
-            ringThread.join();
+            TimeUnit.SECONDS.sleep(1);
         } catch (InterruptedException e) {
             logger.error(e.getMessage(), e);
         }
+        if (ringThread.getState() != Thread.State.TERMINATED){
+            // interrupt and wait
+            ringThread.interrupt();
+            try {
+                ringThread.join();
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
     }
 
 }