Browse Source

[Fix][Task] Wrong complement date (#6009) (#6186)

* [Fix][Task] Wrong complement date (#6009)
Kirs 3 years ago
parent
commit
fced3892ee

+ 23 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@@ -17,6 +17,10 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
+import static java.util.Calendar.DAY_OF_MONTH;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -163,6 +167,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
                     taskExecutionContext.getProcessInstanceId(),
                     taskExecutionContext.getTaskInstanceId()));
 
+            preBuildBusinessParams();
+
             TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
             if (null == taskChannel) {
                 throw new PluginNotFoundException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
@@ -359,4 +365,21 @@ public class TaskExecuteThread implements Runnable, Delayed {
         }
         return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
     }
+
+    private void preBuildBusinessParams() {
+        Map<String, Property> paramsMap = new HashMap<>();
+        // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
+        if (taskExecutionContext.getScheduleTime() != null) {
+            Date date = taskExecutionContext.getScheduleTime();
+            if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
+                date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
+            }
+            String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
+            Property p = new Property();
+            p.setValue(dateTime);
+            p.setProp(Constants.PARAMETER_DATETIME);
+            paramsMap.put(Constants.PARAMETER_DATETIME, p);
+        }
+        taskExecutionContext.setParamsMap(paramsMap);
+    }
 }

+ 12 - 6
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.python;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
 import org.apache.dolphinscheduler.spi.task.AbstractParameters;
 import org.apache.dolphinscheduler.spi.task.Property;
 import org.apache.dolphinscheduler.spi.task.TaskConstants;
@@ -27,6 +28,8 @@ import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
 import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -51,7 +54,6 @@ public class PythonTask extends AbstractTaskExecutor {
 
     private TaskRequest taskRequest;
 
-
     /**
      * constructor
      *
@@ -101,7 +103,7 @@ public class PythonTask extends AbstractTaskExecutor {
         } catch (Exception e) {
             logger.error("python task failure", e);
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
-            throw new TaskException("run python task error",e);
+            throw new TaskException("run python task error", e);
         }
     }
 
@@ -150,6 +152,7 @@ public class PythonTask extends AbstractTaskExecutor {
 
     /**
      * build command
+     *
      * @return raw python script
      * @throws Exception exception
      */
@@ -157,10 +160,14 @@ public class PythonTask extends AbstractTaskExecutor {
         String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
 
         // replace placeholder
-        Map<String, Property> paramsMap = ParamUtils.convert(taskRequest,pythonParameters);
-        if (paramsMap != null){
-            rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
+        Map<String, Property> paramsMap = ParamUtils.convert(taskRequest, pythonParameters);
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
         }
+        if (MapUtils.isNotEmpty(taskRequest.getParamsMap())) {
+            paramsMap.putAll(taskRequest.getParamsMap());
+        }
+        rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
 
         logger.info("raw python script : {}", pythonParameters.getRawScript());
         logger.info("task dir : {}", taskDir);
@@ -168,5 +175,4 @@ public class PythonTask extends AbstractTaskExecutor {
         return rawPythonScript;
     }
 
-
 }

+ 12 - 6
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java

@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.sqoop;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
 import org.apache.dolphinscheduler.plugin.task.sqoop.generator.SqoopJobGenerator;
 import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
+import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
 import org.apache.dolphinscheduler.spi.task.AbstractParameters;
 import org.apache.dolphinscheduler.spi.task.Property;
 import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
@@ -27,6 +28,7 @@ import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
 import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -53,7 +55,7 @@ public class SqoopTask extends AbstractYarnTask {
     public void init() {
         logger.info("sqoop task params {}", sqoopTaskExecutionContext.getTaskParams());
         sqoopParameters =
-            JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class);
+                JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class);
         //check sqoop task params
         if (null == sqoopParameters) {
             throw new IllegalArgumentException("Sqoop Task params is null");
@@ -73,13 +75,17 @@ public class SqoopTask extends AbstractYarnTask {
         // combining local and global parameters
         Map<String, Property> paramsMap = ParamUtils.convert(sqoopTaskExecutionContext, getParameters());
 
-        if (paramsMap != null) {
-            String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
-            logger.info("sqoop script: {}", resultScripts);
-            return resultScripts;
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
         }
+        if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())) {
+            paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap());
+        }
+
+        String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
+        logger.info("sqoop script: {}", resultScripts);
+        return resultScripts;
 
-        return null;
     }
 
     @Override