Browse Source

[Feature-15475][DinkyTask] DinkyTask supports Dinky-1.0.0 and common sql (#15479)

aiwenmo 1 year ago
parent
commit
ace0860d3d

+ 228 - 54
dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java

@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 
 import org.apache.commons.lang3.StringUtils;
@@ -44,6 +45,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -51,6 +53,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.MissingNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 
 @Slf4j
 public class DinkyTask extends AbstractRemoteTask {
@@ -58,6 +61,9 @@ public class DinkyTask extends AbstractRemoteTask {
     private final TaskExecutionContext taskExecutionContext;
 
     private DinkyParameters dinkyParameters;
+    private String jobInstanceId;
+    private boolean status;
+    private String dinkyVersion;
 
     protected DinkyTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
@@ -79,73 +85,192 @@ public class DinkyTask extends AbstractRemoteTask {
         }
     }
 
-    // todo split handle to submit and track
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
-        try {
+        // Get dinky version
+        dinkyVersion = getDinkyVersion(this.dinkyParameters.getAddress());
+        super.handle(taskCallBack);
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        if (dinkyVersion.startsWith("0")) {
+            submitApplicationV0();
+        } else {
+            submitApplicationV1();
+        }
+    }
+
+    @Override
+    public void trackApplicationStatus() throws TaskException {
+        if (dinkyVersion.startsWith("0")) {
+            trackApplicationStatusV0();
+        } else {
+            trackApplicationStatusV1();
+        }
+    }
 
+    private void submitApplicationV0() {
+        try {
             String address = this.dinkyParameters.getAddress();
             String taskId = this.dinkyParameters.getTaskId();
             boolean isOnline = this.dinkyParameters.isOnline();
             JsonNode result;
+            String apiResultDatasKey = DinkyTaskConstants.API_RESULT_DATAS;
             if (isOnline) {
-                // Online dinky task, and only one job is allowed to execute
-                result = onlineTask(address, taskId);
+                // Online dinky-0.6.5 task, and only one job is allowed to execute
+                result = onlineTaskV0(address, taskId);
             } else {
-                // Submit dinky task
-                result = submitTask(address, taskId);
+                // Submit dinky-0.6.5 task
+                result = submitTaskV0(address, taskId);
             }
-            if (checkResult(result)) {
-                boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean();
-                String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText();
-                boolean finishFlag = false;
-                while (!finishFlag) {
-                    JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
-                    if (!checkResult(jobInstanceInfoResult)) {
-                        break;
-                    }
-                    String jobInstanceStatus =
-                            jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
-                    switch (jobInstanceStatus) {
-                        case DinkyTaskConstants.STATUS_FINISHED:
-                            final int exitStatusCode = mapStatusToExitCode(status);
-                            // Use address-taskId as app id
-                            setAppIds(String.format("%s-%s", address, taskId));
-                            setExitStatusCode(exitStatusCode);
-                            log.info("dinky task finished with results: {}",
-                                    result.get(DinkyTaskConstants.API_RESULT_DATAS));
-                            finishFlag = true;
-                            break;
-                        case DinkyTaskConstants.STATUS_FAILED:
-                        case DinkyTaskConstants.STATUS_CANCELED:
-                        case DinkyTaskConstants.STATUS_UNKNOWN:
-                            errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error")
-                                    .asText());
-                            finishFlag = true;
-                            break;
-                        default:
-                            Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
-                    }
+            if (checkResultV0(result)) {
+                status = result.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean();
+                if (result.get(apiResultDatasKey).has(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID)
+                        && !(result.get(apiResultDatasKey)
+                                .get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID) instanceof NullNode)) {
+                    jobInstanceId =
+                            result.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID).asText();
                 }
             }
-        } catch (InterruptedException ex) {
+        } catch (Exception ex) {
             Thread.currentThread().interrupt();
-            log.error("Execute dinkyTask failed", ex);
+            log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
             setExitStatusCode(EXIT_CODE_FAILURE);
-            throw new TaskException("Execute dinkyTask failed", ex);
+            throw new TaskException(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
         }
     }
 
-    @Override
-    public void submitApplication() throws TaskException {
+    private void submitApplicationV1() {
+        try {
+            String address = this.dinkyParameters.getAddress();
+            String taskId = this.dinkyParameters.getTaskId();
+            boolean isOnline = this.dinkyParameters.isOnline();
+            JsonNode result;
+            String apiResultDataKey = DinkyTaskConstants.API_RESULT_DATA;
+            // Submit dinky-1.0.0 task
+            result = submitTaskV1(address, taskId, isOnline, generateVariables());
+            if (checkResultV1(result)) {
+                status = result.get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean();
+                if (result.get(apiResultDataKey).has(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID)
+                        && !(result.get(apiResultDataKey)
+                                .get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID) instanceof NullNode)) {
+                    jobInstanceId =
+                            result.get(apiResultDataKey).get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID).asText();
+                }
+            } else {
+                log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG + "{}", result.get(DinkyTaskConstants.API_RESULT_MSG));
+                setExitStatusCode(EXIT_CODE_FAILURE);
+                throw new TaskException(
+                        DinkyTaskConstants.SUBMIT_FAILED_MSG + result.get(DinkyTaskConstants.API_RESULT_MSG));
+            }
+        } catch (Exception ex) {
+            Thread.currentThread().interrupt();
+            log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw new TaskException(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
+        }
+    }
 
+    public void trackApplicationStatusV0() throws TaskException {
+        try {
+            String address = this.dinkyParameters.getAddress();
+            String taskId = this.dinkyParameters.getTaskId();
+            if (status && jobInstanceId == null) {
+                // Use address-taskId as app id
+                setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
+                setExitStatusCode(mapStatusToExitCode(true));
+                log.info("Dinky common sql task finished.");
+                return;
+            }
+            String apiResultDatasKey = DinkyTaskConstants.API_RESULT_DATAS;
+            boolean finishFlag = false;
+            while (!finishFlag) {
+                JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
+                if (!checkResultV0(jobInstanceInfoResult)) {
+                    break;
+                }
+                String jobInstanceStatus =
+                        jobInstanceInfoResult.get(apiResultDatasKey).get("status").asText();
+                switch (jobInstanceStatus) {
+                    case DinkyTaskConstants.STATUS_FINISHED:
+                        final int exitStatusCode = mapStatusToExitCode(status);
+                        // Use address-taskId as app id
+                        setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
+                        setExitStatusCode(exitStatusCode);
+                        log.info("dinky task finished with results: {}",
+                                jobInstanceInfoResult.get(apiResultDatasKey));
+                        finishFlag = true;
+                        break;
+                    case DinkyTaskConstants.STATUS_FAILED:
+                    case DinkyTaskConstants.STATUS_CANCELED:
+                    case DinkyTaskConstants.STATUS_UNKNOWN:
+                        errorHandle(
+                                jobInstanceInfoResult.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_ERROR)
+                                        .asText());
+                        finishFlag = true;
+                        break;
+                    default:
+                        Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
+                }
+            }
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            log.error(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw new TaskException(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
+        }
     }
 
-    @Override
-    public void trackApplicationStatus() throws TaskException {
+    public void trackApplicationStatusV1() throws TaskException {
+        try {
 
+            String address = this.dinkyParameters.getAddress();
+            String taskId = this.dinkyParameters.getTaskId();
+            if (status && jobInstanceId == null) {
+                // Use address-taskId as app id
+                setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
+                setExitStatusCode(mapStatusToExitCode(true));
+                log.info("Dinky common sql task finished.");
+                return;
+            }
+            String apiResultDataKey = DinkyTaskConstants.API_RESULT_DATA;
+            boolean finishFlag = false;
+            while (!finishFlag) {
+                JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
+                if (!checkResultV1(jobInstanceInfoResult)) {
+                    break;
+                }
+                String jobInstanceStatus =
+                        jobInstanceInfoResult.get(apiResultDataKey).get("status").asText();
+                switch (jobInstanceStatus) {
+                    case DinkyTaskConstants.STATUS_FINISHED:
+                        final int exitStatusCode = mapStatusToExitCode(status);
+                        // Use address-taskId as app id
+                        setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
+                        setExitStatusCode(exitStatusCode);
+                        log.info("dinky task finished with results: {}",
+                                jobInstanceInfoResult.get(apiResultDataKey));
+                        finishFlag = true;
+                        break;
+                    case DinkyTaskConstants.STATUS_FAILED:
+                    case DinkyTaskConstants.STATUS_CANCELED:
+                    case DinkyTaskConstants.STATUS_UNKNOWN:
+                        errorHandle(jobInstanceInfoResult.get(apiResultDataKey).get(DinkyTaskConstants.API_RESULT_ERROR)
+                                .asText());
+                        finishFlag = true;
+                        break;
+                    default:
+                        Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
+                }
+            }
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            log.error(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
+            setExitStatusCode(EXIT_CODE_FAILURE);
+            throw new TaskException(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
+        }
     }
-
     /**
      * map dinky task status to exitStatusCode
      *
@@ -160,15 +285,28 @@ public class DinkyTask extends AbstractRemoteTask {
         }
     }
 
-    private boolean checkResult(JsonNode result) {
-        if (result instanceof MissingNode || result == null) {
+    private boolean checkResultV0(JsonNode result) {
+        boolean isCorrect = true;
+        if (result instanceof MissingNode || result instanceof NullNode) {
+            errorHandle(DinkyTaskConstants.API_VERSION_ERROR_TIPS);
+            isCorrect = false;
+        } else if (result.get(DinkyTaskConstants.API_RESULT_CODE).asInt() == DinkyTaskConstants.API_ERROR) {
+            errorHandle(result.get(DinkyTaskConstants.API_RESULT_MSG));
+            isCorrect = false;
+        }
+        return isCorrect;
+    }
+
+    private boolean checkResultV1(JsonNode result) {
+        boolean isCorrect = true;
+        if (result instanceof MissingNode || result instanceof NullNode) {
             errorHandle(DinkyTaskConstants.API_VERSION_ERROR_TIPS);
-            return false;
-        } else if (result.get("code").asInt() == DinkyTaskConstants.API_ERROR) {
-            errorHandle(result.get("msg"));
-            return false;
+            isCorrect = false;
+        } else if (!result.get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean()) {
+            errorHandle(result.get(DinkyTaskConstants.API_RESULT_MSG));
+            isCorrect = false;
         }
-        return true;
+        return isCorrect;
     }
 
     private void errorHandle(Object msg) {
@@ -196,18 +334,53 @@ public class DinkyTask extends AbstractRemoteTask {
                 taskId);
     }
 
-    private JsonNode submitTask(String address, String taskId) {
+    private Map<String, String> generateVariables() {
+        Map<String, String> variables = new ConcurrentHashMap<>();
+        List<Property> propertyList = JSONUtils.toList(taskExecutionContext.getGlobalParams(), Property.class);
+        if (propertyList != null && !propertyList.isEmpty()) {
+            for (Property property : propertyList) {
+                variables.put(property.getProp(), property.getValue());
+            }
+        }
+        List<Property> localParams = this.dinkyParameters.getLocalParams();
+        if (localParams == null || localParams.isEmpty()) {
+            return variables;
+        }
+        for (Property property : localParams) {
+            variables.put(property.getProp(), property.getValue());
+        }
+        return variables;
+    }
+
+    private String getDinkyVersion(String address) {
+        JsonNode versionJsonNode = parse(doGet(address + DinkyTaskConstants.GET_VERSION, new HashMap<>()));
+        if (versionJsonNode instanceof MissingNode || versionJsonNode == null
+                || versionJsonNode.get(DinkyTaskConstants.API_RESULT_CODE).asInt() == DinkyTaskConstants.API_ERROR) {
+            return "0";
+        }
+        return versionJsonNode.get(DinkyTaskConstants.API_RESULT_DATA).asText();
+    }
+
+    private JsonNode submitTaskV0(String address, String taskId) {
         Map<String, String> params = new HashMap<>();
         params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
         return parse(doGet(address + DinkyTaskConstants.SUBMIT_TASK, params));
     }
 
-    private JsonNode onlineTask(String address, String taskId) {
+    private JsonNode onlineTaskV0(String address, String taskId) {
         Map<String, String> params = new HashMap<>();
         params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
         return parse(doGet(address + DinkyTaskConstants.ONLINE_TASK, params));
     }
 
+    private JsonNode submitTaskV1(String address, String taskId, boolean isOnline, Map<String, String> variables) {
+        Map<String, Object> params = new HashMap<>();
+        params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
+        params.put(DinkyTaskConstants.PARAM_TASK_IS_ONLINE, isOnline);
+        params.put(DinkyTaskConstants.PARAM_TASK_VARIABLES, variables);
+        return parse(sendJsonStr(address + DinkyTaskConstants.SUBMIT_TASK, JSONUtils.toJsonString(params)));
+    }
+
     private JsonNode cancelTask(String address, String taskId) {
         Map<String, String> params = new HashMap<>();
         params.put(DinkyTaskConstants.PARAM_JSON_TASK_ID, taskId);
@@ -289,4 +462,5 @@ public class DinkyTask extends AbstractRemoteTask {
         }
         return result;
     }
+
 }

+ 13 - 0
dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskConstants.java

@@ -27,6 +27,7 @@ public class DinkyTaskConstants {
     }
 
     private static final String API_ROUTE = "/openapi/";
+    public static final String GET_VERSION = API_ROUTE + "version";
     public static final String SUBMIT_TASK = API_ROUTE + "submitTask";
     public static final String ONLINE_TASK = API_ROUTE + "onLineTask";
     public static final String SAVEPOINT_TASK = API_ROUTE + "savepointTask";
@@ -34,11 +35,19 @@ public class DinkyTaskConstants {
     public static final int API_ERROR = 1;
     public static final String API_VERSION_ERROR_TIPS =
             "Please check that the dinky version is greater than or equal to 0.6.5";
+    public static final String API_RESULT_CODE = "code";
+    public static final String API_RESULT_SUCCESS = "success";
+    public static final String API_RESULT_MSG = "msg";
+    public static final String API_RESULT_ERROR = "error";
+    public static final String API_RESULT_DATA = "data";
     public static final String API_RESULT_DATAS = "datas";
+    public static final String API_RESULT_JOB_INSTANCE_ID = "jobInstanceId";
 
     public static final String SAVEPOINT_CANCEL = "cancel";
 
     public static final String PARAM_TASK_ID = "id";
+    public static final String PARAM_TASK_IS_ONLINE = "isOnline";
+    public static final String PARAM_TASK_VARIABLES = "variables";
     public static final String PARAM_JSON_TASK_ID = "taskId";
     public static final String PARAM_SAVEPOINT_TYPE = "type";
     public static final String PARAM_JOB_INSTANCE_ID = "id";
@@ -48,6 +57,10 @@ public class DinkyTaskConstants {
     public static final String STATUS_FAILED = "FAILED";
     public static final String STATUS_UNKNOWN = "UNKNOWN";
 
+    public static final String SUBMIT_FAILED_MSG = "Submit dinkyTask failed:";
+    public static final String TRACK_FAILED_MSG = "Track dinkyTask failed:";
+    public static final String APPIDS_FORMAT = "%s-%s";
+
     public static final long SLEEP_MILLIS = 3000;
 
 }

BIN
dolphinscheduler-ui/public/images/task-icons/dinky.png


BIN
dolphinscheduler-ui/public/images/task-icons/dinky_hover.png