|
@@ -22,6 +22,7 @@ import io.netty.channel.Channel;
|
|
|
import io.netty.channel.ChannelFuture;
|
|
|
import io.netty.channel.ChannelFutureListener;
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
|
|
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
|
|
@@ -67,7 +68,7 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
|
|
|
logger.error("task savepoint request command is null");
|
|
|
return;
|
|
|
}
|
|
|
- logger.info("task savepoint command : {}", taskSavePointRequestCommand);
|
|
|
+ logger.info("Receive task savepoint command : {}", taskSavePointRequestCommand);
|
|
|
|
|
|
int taskInstanceId = taskSavePointRequestCommand.getTaskInstanceId();
|
|
|
TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
|
|
@@ -76,9 +77,14 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- doSavePoint(taskInstanceId);
|
|
|
+ try {
|
|
|
+ LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
|
|
|
+ doSavePoint(taskInstanceId);
|
|
|
|
|
|
- sendTaskSavePointResponseCommand(channel, taskExecutionContext);
|
|
|
+ sendTaskSavePointResponseCommand(channel, taskExecutionContext);
|
|
|
+ } finally {
|
|
|
+ LoggerUtils.removeTaskInstanceIdMDC();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void sendTaskSavePointResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
|
|
@@ -89,7 +95,8 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
|
|
|
public void operationComplete(ChannelFuture future) throws Exception {
|
|
|
if (!future.isSuccess()) {
|
|
|
logger.error("Submit kill response to master error, kill command: {}", taskSavePointResponseCommand);
|
|
|
- }
|
|
|
+ } else
|
|
|
+ logger.info("Submit kill response to master success, kill command: {}", taskSavePointResponseCommand);
|
|
|
}
|
|
|
});
|
|
|
}
|