Browse Source

[Improvement][K8S]Optimize MDC for K8S tasks (#15390)

Signed-off-by: Gallardot <gallardot@apache.org>
Co-authored-by: fuchanghai <changhaifu@apache.org>
Gallardot 1 year ago
parent
commit
c7a11ce6ed

+ 9 - 1
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java

@@ -205,6 +205,8 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
             @Override
             public void eventReceived(Action action, Job job) {
                 try {
+                    LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+                            taskRequest.getTaskInstanceId());
                     LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
                     log.info("event received : job:{} action:{}", job.getMetadata().getName(), action);
                     if (action == Action.DELETED) {
@@ -222,14 +224,18 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
                     }
                 } finally {
                     LogUtils.removeTaskInstanceLogFullPathMDC();
+                    LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                 }
             }
 
             @Override
             public void onClose(WatcherException e) {
+                LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+                        taskRequest.getTaskInstanceId());
                 log.error("[K8sJobExecutor-{}] fail in k8s: {}", job.getMetadata().getName(), e.getMessage());
                 taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
                 countDownLatch.countDown();
+                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
         };
         try (Watch watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher)) {
@@ -260,10 +266,12 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
         String containerName = String.format("%s-%s", taskName, taskInstanceId);
         podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
             TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
+            LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+                    taskRequest.getTaskInstanceId());
+            LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
             try (
                     LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
                             taskRequest.getTaskAppId(), containerName)) {
-                LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
                 String line;
                 try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) {
                     while ((line = reader.readLine()) != null) {