Browse Source

Recreate new TaskInstance Working Directory when exist in worker (#15358)

Wenjun Ruan 1 year ago
parent
commit
0e88ea3ac8

+ 1 - 1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java

@@ -217,7 +217,7 @@ public abstract class WorkerTaskExecutor implements Runnable {
         taskExecutionContext.setTenantCode(tenant);
         log.info("TenantCode: {} check successfully", taskExecutionContext.getTenantCode());
 
-        TaskExecutionContextUtils.createProcessLocalPathIfAbsent(taskExecutionContext);
+        TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext);
         log.info("WorkflowInstanceExecDir: {} check successfully", taskExecutionContext.getExecutePath());
 
         TaskChannel taskChannel =

+ 21 - 14
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java

@@ -80,22 +80,29 @@ public class TaskExecutionContextUtils {
         }
     }
 
-    public static void createProcessLocalPathIfAbsent(TaskExecutionContext taskExecutionContext) throws TaskException {
+    public static void createTaskInstanceWorkingDirectory(TaskExecutionContext taskExecutionContext) throws TaskException {
+        // local execute path
+        String taskInstanceWorkingDirectory = FileUtils.getProcessExecDir(
+                taskExecutionContext.getTenantCode(),
+                taskExecutionContext.getProjectCode(),
+                taskExecutionContext.getProcessDefineCode(),
+                taskExecutionContext.getProcessDefineVersion(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId());
         try {
-            // local execute path
-            String execLocalPath = FileUtils.getProcessExecDir(
-                    taskExecutionContext.getTenantCode(),
-                    taskExecutionContext.getProjectCode(),
-                    taskExecutionContext.getProcessDefineCode(),
-                    taskExecutionContext.getProcessDefineVersion(),
-                    taskExecutionContext.getProcessInstanceId(),
-                    taskExecutionContext.getTaskInstanceId());
-            taskExecutionContext.setExecutePath(execLocalPath);
-            taskExecutionContext.setAppInfoPath(FileUtils.getAppInfoPath(execLocalPath));
-            Path executePath = Paths.get(taskExecutionContext.getExecutePath());
-            FileUtils.createDirectoryIfNotPresent(executePath);
+            Path path = Paths.get(taskInstanceWorkingDirectory);
+            if (Files.deleteIfExists(path)) {
+                log.warn("The TaskInstance WorkingDirectory: {} is exist, will recreate again",
+                        taskInstanceWorkingDirectory);
+            }
+            Files.createDirectories(path);
+            taskExecutionContext.setExecutePath(taskInstanceWorkingDirectory);
+
+            taskExecutionContext.setExecutePath(taskInstanceWorkingDirectory);
+            taskExecutionContext.setAppInfoPath(FileUtils.getAppInfoPath(taskInstanceWorkingDirectory));
         } catch (Throwable ex) {
-            throw new TaskException("Cannot create process execute dir", ex);
+            throw new TaskException(
+                    "Cannot create TaskInstance WorkingDirectory: " + taskInstanceWorkingDirectory + " failed", ex);
         }
     }