Selaa lähdekoodia

fix: Repetitive execution of frequency tasks #375

Echo009 3 vuotta sitten
vanhempi
commit
56447596f7

+ 23 - 2
powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java

@@ -5,8 +5,11 @@ import org.springframework.beans.BeanUtils;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import tech.powerjob.common.enums.InstanceStatus;
+import tech.powerjob.common.enums.Protocol;
 import tech.powerjob.common.enums.TimeExpressionType;
+import tech.powerjob.common.request.ServerStopInstanceReq;
 import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
+import tech.powerjob.server.common.module.WorkerInfo;
 import tech.powerjob.server.common.timewheel.holder.HashedWheelTimerHolder;
 import tech.powerjob.server.common.utils.SpringUtils;
 import tech.powerjob.server.core.service.UserService;
@@ -17,10 +20,13 @@ import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
 import tech.powerjob.server.persistence.remote.model.JobInfoDO;
 import tech.powerjob.server.persistence.remote.model.UserInfoDO;
 import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
+import tech.powerjob.server.remote.transport.TransportService;
+import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
 
 import javax.annotation.Resource;
 import java.util.Date;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -44,7 +50,10 @@ public class InstanceManager {
     private InstanceInfoRepository instanceInfoRepository;
     @Resource
     private WorkflowInstanceManager workflowInstanceManager;
-
+    @Resource
+    private TransportService transportService;
+    @Resource
+    private WorkerClusterQueryService workerClusterQueryService;
 
     /**
      * 更新任务状态
@@ -74,7 +83,7 @@ public class InstanceManager {
         }
         // 丢弃非目标 TaskTracker 的上报数据(脑裂情况)
         if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) {
-            log.warn("[InstanceManager-{}] receive the other TaskTracker's report: {}, but current TaskTracker is {}, this report will br dropped.", instanceId, req, instanceInfo.getTaskTrackerAddress());
+            log.warn("[InstanceManager-{}] receive the other TaskTracker's report: {}, but current TaskTracker is {}, this report will be dropped.", instanceId, req, instanceInfo.getTaskTrackerAddress());
             return;
         }
 
@@ -88,6 +97,18 @@ public class InstanceManager {
         // FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行)
         // 综上,直接把 status 和 runningNum 同步到DB即可
         if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) {
+            // 如果实例处于失败状态,则说明该 worker 失联了一段时间,被 server 判定为宕机,而此时该秒级任务有可能已经重新派发了,故需要 Kill 掉该实例
+            // fix issue 375
+            if (instanceInfo.getStatus() == InstanceStatus.FAILED.getV()) {
+                log.warn("[InstanceManager-{}] receive TaskTracker's report: {}, but current instance is already failed, this instance should be killed.", instanceId, req);
+                Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress());
+                if (workerInfoOpt.isPresent()){
+                    ServerStopInstanceReq stopInstanceReq = new ServerStopInstanceReq(instanceId);
+                    WorkerInfo workerInfo = workerInfoOpt.get();
+                    transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), stopInstanceReq);
+                }
+                return;
+            }
             instanceInfo.setStatus(receivedInstanceStatus.getV());
             instanceInfo.setResult(req.getResult());
             instanceInfo.setRunningTimes(req.getTotalTaskNum());