|
@@ -28,35 +28,48 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
|
|
|
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
|
|
|
|
|
|
import java.util.Map;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
@AutoService(StateEventHandler.class)
|
|
|
public class TaskTimeoutStateEventHandler implements StateEventHandler {
|
|
|
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(TaskTimeoutStateEventHandler.class);
|
|
|
+
|
|
|
@Override
|
|
|
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
|
|
|
- StateEvent stateEvent) throws StateEventHandleError {
|
|
|
+ StateEvent stateEvent) throws StateEventHandleError {
|
|
|
TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;
|
|
|
|
|
|
TaskMetrics.incTaskInstanceByState("timeout");
|
|
|
workflowExecuteRunnable.checkTaskInstanceByStateEvent(taskStateEvent);
|
|
|
|
|
|
TaskInstance taskInstance =
|
|
|
- workflowExecuteRunnable.getTaskInstance(taskStateEvent.getTaskInstanceId()).orElseThrow(
|
|
|
- () -> new StateEventHandleError(String.format(
|
|
|
- "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s",
|
|
|
- taskStateEvent.getTaskInstanceId())));
|
|
|
+ workflowExecuteRunnable.getTaskInstance(taskStateEvent.getTaskInstanceId()).orElseThrow(
|
|
|
+ () -> new StateEventHandleError(String.format(
|
|
|
+ "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s",
|
|
|
+ taskStateEvent.getTaskInstanceId())));
|
|
|
|
|
|
if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
|
|
|
return true;
|
|
|
}
|
|
|
- TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
|
|
|
- Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
|
|
|
- if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
|
|
|
- || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
|
|
|
- ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
|
|
|
- taskProcessor.action(TaskAction.TIMEOUT);
|
|
|
+ TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine()
|
|
|
+ .getTimeoutNotifyStrategy();
|
|
|
+ Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable
|
|
|
+ .getActiveTaskProcessMap();
|
|
|
+ if ((TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
|
|
|
+ || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy)) {
|
|
|
+ if (activeTaskProcessMap.containsKey(taskInstance.getTaskCode())) {
|
|
|
+ ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
|
|
|
+ taskProcessor.action(TaskAction.TIMEOUT);
|
|
|
+ } else {
|
|
|
+ logger.warn(
|
|
|
+ "cannot find the task processor for task {}, so skip task processor action.",
|
|
|
+ taskInstance.getTaskCode());
|
|
|
+ }
|
|
|
}
|
|
|
- if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
|
|
|
+ if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy
|
|
|
+ || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
|
|
|
workflowExecuteRunnable.processTimeout();
|
|
|
workflowExecuteRunnable.taskTimeout(taskInstance);
|
|
|
}
|