Browse Source

refactor: rename RunnableAndCatch to SafeRunnable

tjq 1 year ago
parent
commit
599d710e27

+ 4 - 3
powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java

@@ -1,23 +1,24 @@
-package tech.powerjob.worker.background;
+package tech.powerjob.common.enhance;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.concurrent.ScheduledExecutorService;
 
 /**
+ * 安全的 runnable,可防止因抛出异常导致周期性任务终止
  * 使用 {@link ScheduledExecutorService} 执行任务时,推荐继承此类捕获并打印异常,避免因为抛出异常导致周期性任务终止
  *
  * @author songyinyin
  * @since 2023/9/20 15:52
  */
 @Slf4j
-public abstract class RunnableAndCatch implements Runnable{
+public abstract class SafeRunnable implements Runnable{
   @Override
   public void run() {
     try {
       run0();
     } catch (Exception e) {
-      log.error("[RunnableAndCatch] run failed", e);
+      log.error("[SafeRunnable] run failed", e);
     }
   }
 

+ 4 - 4
powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java

@@ -1,4 +1,4 @@
-package tech.powerjob.worker.background;
+package tech.powerjob.common.enhance;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -11,11 +11,11 @@ import java.util.concurrent.ScheduledExecutorService;
  * @since 2023/9/20 16:04
  */
 @Slf4j
-public class RunnableWrapper implements Runnable {
+public class SafeRunnableWrapper implements Runnable {
 
   private final Runnable runnable;
 
-  public RunnableWrapper(Runnable runnable) {
+  public SafeRunnableWrapper(Runnable runnable) {
     this.runnable = runnable;
   }
 
@@ -24,7 +24,7 @@ public class RunnableWrapper implements Runnable {
     try {
       runnable.run();
     } catch (Exception e) {
-      log.error("[RunnableWrapper] run failed", e);
+      log.error("[SafeRunnableWrapper] run failed", e);
     }
   }
 }

+ 2 - 1
powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java

@@ -1,5 +1,6 @@
 package tech.powerjob.worker.background;
 
+import tech.powerjob.common.enhance.SafeRunnable;
 import tech.powerjob.common.enums.LogLevel;
 import tech.powerjob.common.model.InstanceLogContent;
 import tech.powerjob.common.request.WorkerLogReportReq;
@@ -69,7 +70,7 @@ public class OmsLogHandler {
 
 
 
-    private class LogSubmitter extends RunnableAndCatch {
+    private class LogSubmitter extends SafeRunnable {
 
         @Override
         public void run0() {

+ 2 - 1
powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java

@@ -3,6 +3,7 @@ package tech.powerjob.worker.background;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import tech.powerjob.common.enhance.SafeRunnable;
 import tech.powerjob.common.model.SystemMetrics;
 import tech.powerjob.common.request.WorkerHeartbeat;
 import tech.powerjob.worker.common.PowerJobWorkerVersion;
@@ -22,7 +23,7 @@ import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager;
  */
 @Slf4j
 @RequiredArgsConstructor
-public class WorkerHealthReporter extends RunnableAndCatch {
+public class WorkerHealthReporter extends SafeRunnable {
 
     private final WorkerRuntime workerRuntime;
 

+ 2 - 2
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java

@@ -9,7 +9,7 @@ import tech.powerjob.common.enums.ProcessorType;
 import tech.powerjob.common.enums.TimeExpressionType;
 import tech.powerjob.common.utils.CollectionUtils;
 import tech.powerjob.common.utils.CommonUtils;
-import tech.powerjob.worker.background.RunnableAndCatch;
+import tech.powerjob.common.enhance.SafeRunnable;
 import tech.powerjob.worker.common.WorkerRuntime;
 import tech.powerjob.worker.common.constants.TaskStatus;
 import tech.powerjob.worker.common.utils.TransportUtils;
@@ -238,7 +238,7 @@ public class ProcessorTracker {
     /**
      * 定时向 TaskTracker 汇报(携带任务执行信息的心跳)
      */
-    private class CheckerAndReporter extends RunnableAndCatch {
+    private class CheckerAndReporter extends SafeRunnable {
 
         @Override
         @SuppressWarnings({"squid:S1066","squid:S3776"})

+ 3 - 3
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

@@ -19,7 +19,7 @@ import tech.powerjob.common.serialize.JsonUtils;
 import tech.powerjob.common.utils.CollectionUtils;
 import tech.powerjob.common.utils.CommonUtils;
 import tech.powerjob.common.utils.SegmentLock;
-import tech.powerjob.worker.background.RunnableAndCatch;
+import tech.powerjob.common.enhance.SafeRunnable;
 import tech.powerjob.worker.common.WorkerRuntime;
 import tech.powerjob.worker.common.constants.TaskConstant;
 import tech.powerjob.worker.common.constants.TaskStatus;
@@ -445,7 +445,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
     /**
      * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去
      */
-    protected class Dispatcher extends RunnableAndCatch {
+    protected class Dispatcher extends SafeRunnable {
 
         // 数据库查询限制,每次最多查询几个任务
         private static final int DB_QUERY_LIMIT = 100;
@@ -503,7 +503,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
      * 执行器动态上线(for 秒级任务和 MR 任务)
      * 原则:server 查询得到的 执行器状态不会干预 worker 自己维护的状态,即只做新增,不做任何修改
      */
-    protected class WorkerDetector extends RunnableAndCatch {
+    protected class WorkerDetector extends SafeRunnable {
         @Override
         public void run0() {
 

+ 4 - 4
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java

@@ -9,7 +9,7 @@ import tech.powerjob.common.enums.InstanceStatus;
 import tech.powerjob.common.model.InstanceDetail;
 import tech.powerjob.common.request.ServerScheduleJobReq;
 import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
-import tech.powerjob.worker.background.RunnableWrapper;
+import tech.powerjob.common.enhance.SafeRunnableWrapper;
 import tech.powerjob.worker.common.WorkerRuntime;
 import tech.powerjob.worker.common.constants.TaskConstant;
 import tech.powerjob.worker.common.constants.TaskStatus;
@@ -94,14 +94,14 @@ public class LightTaskTracker extends TaskTracker {
             // 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段
             long initDelay = RandomUtils.nextInt(5000, 10000);
             // 上报任务状态
-            statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(new RunnableWrapper(this::checkAndReportStatus), initDelay, delay, TimeUnit.MILLISECONDS);
+            statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(new SafeRunnableWrapper(this::checkAndReportStatus), initDelay, delay, TimeUnit.MILLISECONDS);
             // 超时控制
             if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
                 if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
-                    timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
+                    timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new SafeRunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
                 } else {
                     // 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s
-                    timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
+                    timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new SafeRunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
                 }
             } else {
                 timeoutCheckScheduledFuture = null;