Browse Source

执行器回调日志落盘方案复用RPC序列化方案,并移除Jackson依赖;

xuxueli 6 years ago
parent
commit
46779d7e6b

+ 2 - 1
doc/XXL-JOB官方文档.md

@@ -1419,9 +1419,10 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
 - 5、精简项目,取消第三方依赖,如 commons-collections4 ;
 - 6、底层Log调优,应用正常终止取消异常栈信息打印;
 - 7、交互优化,尽量避免新开页面窗口;仅WebIDE支持新开页,并提供窗口快速关闭按钮;
-- 8、[测试中]底层通讯方案优化:升级较新版本xxl-rpc,由"JETTY"方案调整为"NETTY_HTTP"方案,执行器内嵌netty-http-server提供服务,调度中心复用容器端口提供服务;
+- 8、底层通讯方案优化:升级较新版本xxl-rpc,由"JETTY"方案调整为"NETTY_HTTP"方案,执行器内嵌netty-http-server提供服务,调度中心复用容器端口提供服务;
 - 9、任务暂停、删除优化,避免quartz delete不完整导致任务脏数据;
 - 10、任务回调、心跳注册成功日志优化,非核心常规日志调整为debug级别,降低冗余日志输出;
+- 11、执行器回调日志落盘方案复用RPC序列化方案,并移除Jackson依赖;
 - [迭代中]注册中心优化,实时性注册发现:心跳注册间隔10s,refresh失败则首次注册并立即更新注册信息,心跳类似;30s过期销毁;
 - [迭代中]脚本任务,支持数据参数,新版本仅支持单参数不支持需要兼容;
 - [迭代中]提供执行器Docker镜像;

+ 0 - 2
pom.xml

@@ -41,8 +41,6 @@
 		<groovy.version>2.5.5</groovy.version>
 		<quartz.version>2.3.0</quartz.version>
 
-		<jackson.version>2.9.8</jackson.version>
-
 	</properties>
 
 	<build>

+ 1 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java

@@ -1,7 +1,7 @@
 package com.xxl.job.admin.controller.resolver;
 
 import com.xxl.job.core.biz.model.ReturnT;
-import com.xxl.job.core.util.JacksonUtil;
+import com.xxl.job.admin.core.util.JacksonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;

+ 0 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/I18nUtil.java

@@ -1,7 +1,6 @@
 package com.xxl.job.admin.core.util;
 
 import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
-import com.xxl.job.core.util.JacksonUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

+ 1 - 1
xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java → xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/JacksonUtil.java

@@ -1,4 +1,4 @@
-package com.xxl.job.core.util;
+package com.xxl.job.admin.core.util;
 
 import com.fasterxml.jackson.core.JsonGenerationException;
 import com.fasterxml.jackson.core.JsonParseException;

+ 0 - 7
xxl-job-core/pom.xml

@@ -36,13 +36,6 @@
 			<version>${commons-exec.version}</version>
 		</dependency>
 
-		<!-- jackson -->
-		<dependency>
-			<groupId>com.fasterxml.jackson.core</groupId>
-			<artifactId>jackson-databind</artifactId>
-			<version>${jackson.version}</version>
-		</dependency>
-
 		<!-- spring-context -->
 		<dependency>
 			<groupId>org.springframework</groupId>

+ 6 - 1
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java

@@ -107,7 +107,9 @@ public class XxlJobExecutor  {
 
     // ---------------------- admin-client (rpc invoker) ----------------------
     private static List<AdminBiz> adminBizList;
+    private static Serializer serializer;
     private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
+        serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
         if (adminAddresses!=null && adminAddresses.trim().length()>0) {
             for (String address: adminAddresses.trim().split(",")) {
                 if (address!=null && address.trim().length()>0) {
@@ -116,7 +118,7 @@ public class XxlJobExecutor  {
 
                     AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
                             NetEnum.NETTY_HTTP,
-                            Serializer.SerializeEnum.HESSIAN.getSerializer(),
+                            serializer,
                             CallType.SYNC,
                             LoadBalance.ROUND,
                             AdminBiz.class,
@@ -139,6 +141,9 @@ public class XxlJobExecutor  {
     public static List<AdminBiz> getAdminBizList(){
         return adminBizList;
     }
+    public static Serializer getSerializer() {
+        return serializer;
+    }
 
 
     // ---------------------- executor-server (rpc provider) ----------------------

+ 37 - 31
xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java

@@ -8,7 +8,6 @@ import com.xxl.job.core.executor.XxlJobExecutor;
 import com.xxl.job.core.log.XxlJobFileAppender;
 import com.xxl.job.core.log.XxlJobLogger;
 import com.xxl.job.core.util.FileUtil;
-import com.xxl.job.core.util.JacksonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -190,46 +189,53 @@ public class TriggerCallbackThread {
 
     // ---------------------- fail-callback file ----------------------
 
-    private static String failCallbackFileName = XxlJobFileAppender.getLogPath().concat(File.separator).concat("xxl-job-callback").concat(".log");
+    private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator);
+    private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log");
 
     private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){
+        // valid
+        if (callbackParamList==null || callbackParamList.size()==0) {
+            return;
+        }
+
         // append file
-        String content = JacksonUtil.writeValueAsString(callbackParamList);
-        FileUtil.appendFileLine(failCallbackFileName, content);
+        byte[] callbackParamList_bytes = XxlJobExecutor.getSerializer().serialize(callbackParamList);
+
+        File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
+        if (callbackLogFile.exists()) {
+            for (int i = 0; i < 100; i++) {
+                callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));
+                if (!callbackLogFile.exists()) {
+                    break;
+                }
+            }
+        }
+        FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
     }
 
     private void retryFailCallbackFile(){
 
-        // load and clear file
-        List<String> fileLines = FileUtil.loadFileLines(failCallbackFileName);
-        FileUtil.deleteFile(failCallbackFileName);
-
-        // parse
-        List<HandleCallbackParam> failCallbackParamList = new ArrayList<>();
-        if (fileLines!=null && fileLines.size()>0) {
-            for (String line: fileLines) {
-                List<HandleCallbackParam> failCallbackParamListTmp = JacksonUtil.readValue(line, List.class, HandleCallbackParam.class);
-                if (failCallbackParamListTmp!=null && failCallbackParamListTmp.size()>0) {
-                    failCallbackParamList.addAll(failCallbackParamListTmp);
-                }
-            }
+        // valid
+        File callbackLogPath = new File(failCallbackFilePath);
+        if (!callbackLogPath.exists()) {
+            return;
+        }
+        if (callbackLogPath.isFile()) {
+            callbackLogPath.delete();
+        }
+        if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {
+            return;
         }
 
-        // retry callback, 100 lines per page
-        if (failCallbackParamList!=null && failCallbackParamList.size()>0) {
-            int pagesize = 100;
-            List<HandleCallbackParam> pageData = new ArrayList<>();
-            for (int i = 0; i < failCallbackParamList.size(); i++) {
-                pageData.add(failCallbackParamList.get(i));
-                if (i>0 && i%pagesize == 0) {
-                    doCallback(pageData);
-                    pageData.clear();
-                }
-            }
-            if (pageData.size() > 0) {
-                doCallback(pageData);
-            }
+        // load and clear file, retry
+        for (File callbaclLogFile: callbackLogPath.listFiles()) {
+            byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);
+            List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) XxlJobExecutor.getSerializer().deserialize(callbackParamList_bytes, HandleCallbackParam.class);
+
+            callbaclLogFile.delete();
+            doCallback(callbackParamList);
         }
+
     }
 
 }

+ 65 - 5
xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java

@@ -3,9 +3,10 @@ package com.xxl.job.core.util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 
 /**
  * file tool
@@ -15,6 +16,7 @@ import java.util.List;
 public class FileUtil {
     private static Logger logger = LoggerFactory.getLogger(FileUtil.class);
 
+
     /**
      * delete recursively
      *
@@ -36,6 +38,7 @@ public class FileUtil {
         return false;
     }
 
+
     public static void deleteFile(String fileName) {
         // file
         File file = new File(fileName);
@@ -44,7 +47,64 @@ public class FileUtil {
         }
     }
 
-    public static void appendFileLine(String fileName, String content) {
+
+    public static void writeFileContent(File file, byte[] data) {
+
+        // file
+        if (!file.exists()) {
+            try {
+                file.createNewFile();
+            } catch (IOException e) {
+                logger.error(e.getMessage(), e);
+                return;
+            }
+        }
+
+        // append file content
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(file);
+            fos.write(data);
+            fos.flush();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (fos != null) {
+                try {
+                    fos.close();
+                } catch (IOException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+
+    }
+
+    public static byte[] readFileContent(File file) {
+        Long filelength = file.length();
+        byte[] filecontent = new byte[filelength.intValue()];
+
+        FileInputStream in = null;
+        try {
+            in = new FileInputStream(file);
+            in.read(filecontent);
+            in.close();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (in != null) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+        return filecontent;
+    }
+
+
+    /*public static void appendFileLine(String fileName, String content) {
 
         // file
         File file = new File(fileName);
@@ -119,6 +179,6 @@ public class FileUtil {
         }
 
         return result;
-    }
+    }*/
 
 }