|
@@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
|
|
|
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.task.dq.rule.RuleManager;
|
|
|
import org.apache.dolphinscheduler.plugin.task.dq.rule.parameter.DataQualityConfiguration;
|
|
|
import org.apache.dolphinscheduler.plugin.task.dq.utils.spark.SparkArgsUtils;
|
|
@@ -52,6 +53,7 @@ import java.io.File;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
@@ -160,8 +162,6 @@ public class DataQualityTask extends AbstractYarnTask {
|
|
|
List<String> args = new ArrayList<>();
|
|
|
|
|
|
args.add(SPARK2_COMMAND);
|
|
|
-
|
|
|
- // other parameters
|
|
|
args.addAll(SparkArgsUtils.buildArgs(dataQualityParameters.getSparkParameters()));
|
|
|
|
|
|
// replace placeholder
|
|
@@ -169,10 +169,15 @@ public class DataQualityTask extends AbstractYarnTask {
|
|
|
|
|
|
String command = null;
|
|
|
|
|
|
- if (null != paramsMap) {
|
|
|
- command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
|
|
|
+ if (MapUtils.isEmpty(paramsMap)) {
|
|
|
+ paramsMap = new HashMap<>();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (MapUtils.isNotEmpty(dqTaskExecutionContext.getParamsMap())) {
|
|
|
+ paramsMap.putAll(dqTaskExecutionContext.getParamsMap());
|
|
|
}
|
|
|
|
|
|
+ command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
|
|
|
logger.info("data quality task command: {}", command);
|
|
|
|
|
|
return command;
|
|
@@ -181,8 +186,8 @@ public class DataQualityTask extends AbstractYarnTask {
|
|
|
@Override
|
|
|
protected void setMainJarName() {
|
|
|
ResourceInfo mainJar = new ResourceInfo();
|
|
|
- String basePath = System.getProperty("user.dir").replace(File.separator + "bin", File.separator + "libs");
|
|
|
- mainJar.setRes(basePath + File.separator + CommonUtils.getDataQualityJarName());
|
|
|
+ String basePath = System.getProperty("user.dir").replace(File.separator + "bin", "");
|
|
|
+ mainJar.setRes(basePath + File.separator + "libs" + File.separator + CommonUtils.getDataQualityJarName());
|
|
|
dataQualityParameters.getSparkParameters().setMainJar(mainJar);
|
|
|
}
|
|
|
|