Browse Source

biz thread pool

xuxueli 4 years ago
parent
commit
8360707bb9
1 changed files with 36 additions and 9 deletions
  1. 36 9
      xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java

+ 36 - 9
xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java

@@ -20,7 +20,7 @@ import io.netty.util.CharsetUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * Copy from : https://github.com/xuxueli/xxl-rpc
@@ -43,6 +43,25 @@ public class EmbedServer {
                 // param
                 EventLoopGroup bossGroup = new NioEventLoopGroup();
                 EventLoopGroup workerGroup = new NioEventLoopGroup();
+                ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
+                        0,
+                        200,
+                        60L,
+                        TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<Runnable>(2000),
+                        new ThreadFactory() {
+                            @Override
+                            public Thread newThread(Runnable r) {
+                                return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
+                            }
+                        },
+                        new RejectedExecutionHandler() {
+                            @Override
+                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
+                            }
+                        });
+
 
                 try {
                     // start server
@@ -56,7 +75,7 @@ public class EmbedServer {
                                             .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                             .addLast(new HttpServerCodec())
                                             .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
-                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken));
+                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                 }
                             })
                             .childOption(ChannelOption.SO_KEEPALIVE, true);
@@ -121,9 +140,11 @@ public class EmbedServer {
 
         private ExecutorBiz executorBiz;
         private String accessToken;
-        public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken) {
+        private ThreadPoolExecutor bizThreadPool;
+        public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
             this.executorBiz = executorBiz;
             this.accessToken = accessToken;
+            this.bizThreadPool = bizThreadPool;
         }
 
         @Override
@@ -137,14 +158,20 @@ public class EmbedServer {
             boolean keepAlive = HttpUtil.isKeepAlive(msg);
             String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
 
-            // do invoke
-            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
+            // invoke
+            bizThreadPool.execute(new Runnable() {
+                @Override
+                public void run() {
+                    // do invoke
+                    Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
 
-            // to json
-            String responseJson = GsonTool.toJson(responseObj);
+                    // to json
+                    String responseJson = GsonTool.toJson(responseObj);
 
-            // write response
-            writeResponse(ctx, keepAlive, responseJson);
+                    // write response
+                    writeResponse(ctx, keepAlive, responseJson);
+                }
+            });
         }
 
         private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {