Kerwin 3 лет назад
Родитель
Сommit
18047881d5

+ 0 - 5
dolphinscheduler-server/pom.xml

@@ -42,11 +42,6 @@
             <groupId>org.apache.dolphinscheduler</groupId>
             <artifactId>dolphinscheduler-spi</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-task-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>

+ 1 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -44,6 +43,7 @@ import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
 import java.util.Date;

+ 1 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
@@ -39,6 +38,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
 import java.util.Collections;

+ 1 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.RetryerUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -39,6 +38,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.spi.task.AbstractTask;
 import org.apache.dolphinscheduler.spi.task.TaskChannel;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
 import org.apache.commons.collections.MapUtils;

+ 1 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@@ -22,13 +22,13 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
 import java.util.concurrent.DelayQueue;

+ 70 - 1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

@@ -56,7 +56,7 @@ import org.slf4j.LoggerFactory;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
-        JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
+    JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
 @Ignore
 public class TaskExecuteProcessorTest {
 
@@ -74,10 +74,79 @@ public class TaskExecuteProcessorTest {
 
     private TaskExecuteRequestCommand taskRequestCommand;
 
+
     private AlertClientService alertClientService;
 
     private WorkerManagerThread workerManager;
 
+    @Before
+    public void before() throws Exception {
+        // init task execution context
+        taskExecutionContext = getTaskExecutionContext();
+        workerConfig = new WorkerConfig();
+        workerConfig.setWorkerExecThreads(1);
+        workerConfig.setListenPort(1234);
+        command = new Command();
+        command.setType(CommandType.TASK_EXECUTE_REQUEST);
+        ackCommand = new TaskExecuteAckCommand().convert2Command();
+        taskRequestCommand = new TaskExecuteRequestCommand();
+        alertClientService = PowerMockito.mock(AlertClientService.class);
+        workerExecService = PowerMockito.mock(ExecutorService.class);
+        PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
+                .thenReturn(null);
+
+        PowerMockito.mockStatic(ChannelUtils.class);
+        PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
+
+        taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
+        PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
+
+        PowerMockito.mockStatic(SpringApplicationContext.class);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
+                .thenReturn(taskCallbackService);
+        PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
+                .thenReturn(workerConfig);
+
+        Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+                taskExecutionContext.getProcessDefineCode(),
+                taskExecutionContext.getProcessDefineVersion(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId()));
+
+        workerManager = PowerMockito.mock(WorkerManagerThread.class);
+        PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService))).thenReturn(Boolean.TRUE);
+
+        PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
+                .thenReturn(workerManager);
+
+        PowerMockito.mockStatic(ThreadUtils.class);
+        PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()))
+                .thenReturn(workerExecService);
+
+        PowerMockito.mockStatic(JsonSerializer.class);
+        PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
+                .thenReturn(taskRequestCommand);
+
+        PowerMockito.mockStatic(JSONUtils.class);
+        PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
+                .thenReturn(taskRequestCommand);
+        PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class))
+                .thenReturn(taskExecutionContext);
+
+        PowerMockito.mockStatic(FileUtils.class);
+        PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
+                taskExecutionContext.getProcessDefineCode(),
+                taskExecutionContext.getProcessDefineVersion(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId()))
+                .thenReturn(taskExecutionContext.getExecutePath());
+        PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
+
+        SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService);
+        PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
+                .thenReturn(simpleTaskExecuteThread);
+    }
+
     @Test
     public void testNormalExecution() {
         TaskExecuteProcessor processor = new TaskExecuteProcessor();

+ 1 - 1
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java

@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.plugin.task.api;
+package org.apache.dolphinscheduler.spi.task;
 
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 

+ 6 - 8
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.util.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.util.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.util.ThreadUtils;
 import org.apache.dolphinscheduler.spi.task.TaskConstants;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
@@ -158,7 +159,7 @@ public abstract class AbstractCommandExecutor {
         // parse process output
         parseProcessOutput(process);
 
-        int processId = getProcessId(process);
+        Integer processId = getProcessId(process);
 
         result.setProcessId(processId);
 
@@ -167,7 +168,6 @@ public abstract class AbstractCommandExecutor {
         boolean updateTaskExecutionContextStatus = TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest);
         if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
             ProcessUtils.kill(taskRequest);
-            result.setStatus(TaskRunStatus.FAIL_AND_NEED_KILL);
             result.setExitStatusCode(EXIT_CODE_KILL);
             return result;
         }
@@ -179,10 +179,6 @@ public abstract class AbstractCommandExecutor {
 
         // waiting for the run to finish
         boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
-        logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}",
-                taskRequest.getExecutePath(),
-                processId
-                , result.getExitStatusCode());
 
         // if SHELL task exit
         if (status) {
@@ -194,12 +190,14 @@ public abstract class AbstractCommandExecutor {
             result.setExitStatusCode(process.exitValue());
 
         } else {
-            logger.error("process has failure , exitStatusCode : {} , ready to kill ...", result.getExitStatusCode());
+            logger.error("process has failure , exitStatusCode:{}, processExitValue:{}, ready to kill ...",
+                    result.getExitStatusCode(), process.exitValue());
             ProcessUtils.kill(taskRequest);
-            result.setStatus(TaskRunStatus.FAIL_AND_NEED_KILL);
             result.setExitStatusCode(EXIT_CODE_FAILURE);
         }
 
+        logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
+                taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, process.exitValue());
         return result;
 
     }

+ 5 - 0
dolphinscheduler-task-plugin/dolphinscheduler-task-shell/pom.xml

@@ -39,6 +39,11 @@
             <artifactId>dolphinscheduler-task-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 49 - 54
dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java

@@ -17,29 +17,30 @@
 
 package org.apache.dolphinscheduler.plugin.task.shell;
 
+import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.spi.task.TaskConstants.RWXR_XR_X;
+
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
-import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.util.OSUtils;
 import org.apache.dolphinscheduler.spi.task.AbstractParameters;
-import org.apache.dolphinscheduler.spi.task.Direct;
 import org.apache.dolphinscheduler.spi.task.Property;
-import org.apache.dolphinscheduler.spi.task.TaskConstants;
+import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
+import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
+import org.apache.commons.collections4.MapUtils;
+
 import java.io.File;
-import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.FileAttribute;
 import java.nio.file.attribute.PosixFilePermission;
 import java.nio.file.attribute.PosixFilePermissions;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -61,49 +62,47 @@ public class ShellTask extends AbstractTaskExecutor {
     /**
      * taskExecutionContext
      */
-    private TaskRequest taskRequest;
-
-    private String command;
+    private TaskRequest taskExecutionContext;
 
     /**
      * constructor
      *
-     * @param taskRequest taskRequest
+     * @param taskExecutionContext taskExecutionContext
      */
-    public ShellTask(TaskRequest taskRequest) {
-        super(taskRequest);
+    public ShellTask(TaskRequest taskExecutionContext) {
+        super(taskExecutionContext);
 
-        this.taskRequest = taskRequest;
+        this.taskExecutionContext = taskExecutionContext;
         this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskRequest,
+                taskExecutionContext,
                 logger);
     }
 
     @Override
     public void init() {
-        logger.info("shell task params {}", taskRequest.getTaskParams());
+        logger.info("shell task params {}", taskExecutionContext.getTaskParams());
 
-        shellParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), ShellParameters.class);
+        shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class);
 
-        assert shellParameters != null;
         if (!shellParameters.checkParameters()) {
             throw new RuntimeException("shell task params is not valid");
         }
     }
 
     @Override
-    public void handle() {
+    public void handle() throws Exception {
         try {
             // construct process
-            TaskResponse response = shellCommandExecutor.run(command);
-            setExitStatusCode(response.getExitStatusCode());
-            setAppIds(response.getAppIds());
-            setProcessId(response.getProcessId());
-            setResult(shellCommandExecutor.getTaskResultString());
+            String command = buildCommand();
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
+            setExitStatusCode(commandExecuteResult.getExitStatusCode());
+            setAppIds(commandExecuteResult.getAppIds());
+            setProcessId(commandExecuteResult.getProcessId());
+            shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
         } catch (Exception e) {
             logger.error("shell task error", e);
-            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
-            throw new TaskException("shell task error", e);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw e;
         }
     }
 
@@ -113,36 +112,32 @@ public class ShellTask extends AbstractTaskExecutor {
         shellCommandExecutor.cancelApplication();
     }
 
-    @Override
-    public String getPreScript() {
-        return shellParameters.getRawScript().replaceAll("\\r\\n", "\n");
-    }
-
     /**
-     * set command
+     * create command
      *
-     * @throws IOException exception
+     * @return file name
+     * @throws Exception exception
      */
-    @Override
-    public void setCommand(String command) throws IOException {
+    private String buildCommand() throws Exception {
         // generate scripts
         String fileName = String.format("%s/%s_node.%s",
-                taskRequest.getExecutePath(),
-                taskRequest.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
+                taskExecutionContext.getExecutePath(),
+                taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
 
         Path path = new File(fileName).toPath();
 
         if (Files.exists(path)) {
-            this.command = fileName;
-            return;
+            return fileName;
         }
-        this.command = command;
-        shellParameters.setRawScript(command);
+
+        String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n");
+        script = parseScript(script);
+        shellParameters.setRawScript(script);
 
         logger.info("raw script : {}", shellParameters.getRawScript());
-        logger.info("task execute path : {}", taskRequest.getExecutePath());
+        logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
 
-        Set<PosixFilePermission> perms = PosixFilePermissions.fromString(TaskConstants.RWXR_XR_X);
+        Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
         FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
 
         if (OSUtils.isWindows()) {
@@ -152,7 +147,8 @@ public class ShellTask extends AbstractTaskExecutor {
         }
 
         Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
-        this.command = fileName;
+
+        return fileName;
     }
 
     @Override
@@ -160,16 +156,15 @@ public class ShellTask extends AbstractTaskExecutor {
         return shellParameters;
     }
 
-    public void setResult(String result) {
-        Map<String, Property> localParams = shellParameters.getLocalParametersMap();
-        List<Map<String, String>> outProperties = new ArrayList<>();
-        Map<String, String> p = new HashMap<>();
-        localParams.forEach((k, v) -> {
-            if (v.getDirect() == Direct.OUT) {
-                p.put(k, result);
-            }
-        });
-        outProperties.add(p);
-        resultString = JSONUtils.toJsonString(outProperties);
+    private String parseScript(String script) {
+        // combining local and global parameters
+        Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
+        }
+        return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
     }
 }