|
@@ -18,8 +18,10 @@
|
|
|
package org.apache.dolphinscheduler.plugin.task.hivecli;
|
|
|
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
|
|
|
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
|
|
+import org.apache.dolphinscheduler.common.utils.OSUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
|
|
@@ -31,12 +33,23 @@ import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
|
|
|
|
|
|
+import org.apache.commons.io.FileUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
|
+import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.nio.file.Files;
|
|
|
+import java.nio.file.Path;
|
|
|
+import java.nio.file.StandardOpenOption;
|
|
|
+import java.nio.file.attribute.FileAttribute;
|
|
|
+import java.nio.file.attribute.PosixFilePermission;
|
|
|
+import java.nio.file.attribute.PosixFilePermissions;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
public class HiveCliTask extends AbstractRemoteTask {
|
|
|
|
|
@@ -108,29 +121,41 @@ public class HiveCliTask extends AbstractRemoteTask {
|
|
|
|
|
|
final String type = hiveCliParameters.getHiveCliTaskExecutionType();
|
|
|
|
|
|
+ String sqlContent = "";
|
|
|
+ String resourceFileName = "";
|
|
|
// TODO: make sure type is not unknown
|
|
|
if (HiveCliConstants.TYPE_FILE.equals(type)) {
|
|
|
- args.add(HiveCliConstants.HIVE_CLI_EXECUTE_FILE);
|
|
|
final List<ResourceInfo> resourceInfos = hiveCliParameters.getResourceList();
|
|
|
if (resourceInfos.size() > 1) {
|
|
|
log.warn("more than 1 files detected, use the first one by default");
|
|
|
}
|
|
|
|
|
|
- args.add(StringUtils.stripStart(resourceInfos.get(0).getResourceName(), "/"));
|
|
|
+ try {
|
|
|
+ resourceFileName = resourceInfos.get(0).getResourceName();
|
|
|
+ sqlContent = FileUtils.readFileToString(
|
|
|
+ new File(String.format("%s/%s", taskExecutionContext.getExecutePath(), resourceFileName)),
|
|
|
+ StandardCharsets.UTF_8);
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("read hive sql content from file {} error ", resourceFileName, e);
|
|
|
+ throw new TaskException("read hive sql content error", e);
|
|
|
+ }
|
|
|
} else {
|
|
|
- final String script = hiveCliParameters.getHiveSqlScript();
|
|
|
- args.add(String.format(HiveCliConstants.HIVE_CLI_EXECUTE_SCRIPT, script));
|
|
|
+ sqlContent = hiveCliParameters.getHiveSqlScript();
|
|
|
}
|
|
|
|
|
|
+ final Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
|
|
|
+ sqlContent = ParameterUtils.convertParameterPlaceholders(sqlContent, ParameterUtils.convert(paramsMap));
|
|
|
+ log.info("HiveCli sql content: {}", sqlContent);
|
|
|
+ String sqlFilePath = generateSqlScriptFile(sqlContent);
|
|
|
+
|
|
|
+ args.add(HiveCliConstants.HIVE_CLI_EXECUTE_FILE);
|
|
|
+ args.add(new File(sqlFilePath).getName());
|
|
|
final String hiveCliOptions = hiveCliParameters.getHiveCliOptions();
|
|
|
if (StringUtils.isNotEmpty(hiveCliOptions)) {
|
|
|
args.add(hiveCliOptions);
|
|
|
}
|
|
|
|
|
|
- final Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
|
|
|
- final String command =
|
|
|
- ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap));
|
|
|
-
|
|
|
+ String command = String.join(" ", args);
|
|
|
log.info("hiveCli task command: {}", command);
|
|
|
|
|
|
return command;
|
|
@@ -151,4 +176,34 @@ public class HiveCliTask extends AbstractRemoteTask {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ protected String generateSqlScriptFile(String rawScript) {
|
|
|
+ String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(),
|
|
|
+ taskExecutionContext.getTaskAppId());
|
|
|
+
|
|
|
+ File file = new File(scriptFileName);
|
|
|
+ Path path = file.toPath();
|
|
|
+
|
|
|
+ if (!Files.exists(path)) {
|
|
|
+ String script = rawScript.replaceAll("\\r\\n", "\n");
|
|
|
+
|
|
|
+ Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
|
|
|
+ FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
|
|
|
+ try {
|
|
|
+ if (OSUtils.isWindows()) {
|
|
|
+ Files.createFile(path);
|
|
|
+ } else {
|
|
|
+ if (!file.getParentFile().exists()) {
|
|
|
+ file.getParentFile().mkdirs();
|
|
|
+ }
|
|
|
+ Files.createFile(path, attr);
|
|
|
+ }
|
|
|
+ Files.write(path, script.getBytes(), StandardOpenOption.APPEND);
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("generate hivecli sql script error", e);
|
|
|
+ throw new TaskException("generate hivecli sql script error", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return scriptFileName;
|
|
|
+ }
|
|
|
+
|
|
|
}
|