Parcourir la source

fix NPE while retry task (#12903)

Kerwin il y a 2 ans
Parent
commit
c916c60853

+ 1 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java

@@ -31,7 +31,7 @@ public interface StateEvent {
 
     int getProcessInstanceId();
 
-    int getTaskInstanceId();
+    Integer getTaskInstanceId();
 
     @NonNull
     StateEventType getType();

+ 1 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEvent.java

@@ -36,7 +36,7 @@ public class TaskStateEvent implements StateEvent {
     // todo: use wrapper type
     private int processInstanceId;
 
-    private int taskInstanceId;
+    private Integer taskInstanceId;
 
     private long taskCode;
 

+ 1 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEvent.java

@@ -39,7 +39,7 @@ public class WorkflowStateEvent implements StateEvent {
     /**
      * Some event may contains taskInstanceId
      */
-    private int taskInstanceId;
+    private Integer taskInstanceId;
 
     private WorkflowExecutionStatus status;
 

+ 4 - 4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@@ -211,8 +211,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
             return;
         }
         taskInstanceRetryCheckList.add(taskInstanceKey);
-        logger.info("[WorkflowInstance-{}][TaskInstance-{}] Added task instance into retry check list",
-                processInstance.getId(), taskInstance.getId());
+        logger.info("[WorkflowInstance-{}][TaskInstanceKey-{}:{}] Added task instance into retry check list",
+                processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
     }
 
     public void removeTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
@@ -344,8 +344,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
                     // reset taskInstance endTime and state
                     // todo relative funtion: TaskInstance.retryTaskIntervalOverTime,
                     // WorkflowExecuteThread.cloneRetryTaskInstance
-                    logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance",
-                            taskInstance.getId());
+                    logger.info("[TaskInstanceKey-{}:{}]The task instance can retry, will retry this task instance",
+                            taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
                     taskInstance.setEndTime(null);
                     taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
 

+ 2 - 5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@@ -539,7 +539,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      * check if task instance exist by state event
      */
     public void checkTaskInstanceByStateEvent(TaskStateEvent stateEvent) throws StateEventHandleError {
-        if (stateEvent.getTaskInstanceId() == 0) {
+        if (stateEvent.getTaskInstanceId() == null || stateEvent.getTaskInstanceId() == 0) {
             throw new StateEventHandleError("The taskInstanceId is 0");
         }
 
@@ -562,10 +562,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      * get task instance from memory
      */
     public Optional<TaskInstance> getTaskInstance(int taskInstanceId) {
-        if (taskInstanceMap.containsKey(taskInstanceId)) {
-            return Optional.ofNullable(taskInstanceMap.get(taskInstanceId));
-        }
-        return Optional.empty();
+        return Optional.ofNullable(taskInstanceMap.get(taskInstanceId));
     }
 
     public Optional<TaskInstance> getTaskInstance(long taskCode) {

+ 3 - 3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java

@@ -79,16 +79,16 @@ public class LoggerUtils {
         return "";
     }
 
-    public static void setWorkflowAndTaskInstanceIDMDC(int workflowInstanceId, int taskInstanceId) {
+    public static void setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, Integer taskInstanceId) {
         setWorkflowInstanceIdMDC(workflowInstanceId);
         setTaskInstanceIdMDC(taskInstanceId);
     }
 
-    public static void setWorkflowInstanceIdMDC(int workflowInstanceId) {
+    public static void setWorkflowInstanceIdMDC(Integer workflowInstanceId) {
         MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId));
     }
 
-    public static void setTaskInstanceIdMDC(int taskInstanceId) {
+    public static void setTaskInstanceIdMDC(Integer taskInstanceId) {
         MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
     }