|
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants;
|
|
|
import org.apache.dolphinscheduler.common.enums.ProgramType;
|
|
|
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
@@ -32,46 +33,58 @@ import java.util.List;
|
|
|
public class FlinkArgsUtils {
|
|
|
|
|
|
|
|
|
- * build args
|
|
|
+ * build args
|
|
|
+ *
|
|
|
* @param param
|
|
|
* @return
|
|
|
*/
|
|
|
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(FlinkArgsUtils.class);
|
|
|
+
|
|
|
public static List<String> buildArgs(FlinkParameters param) {
|
|
|
List<String> args = new ArrayList<>();
|
|
|
+ String deployMode = "cluster";
|
|
|
+ if (StringUtils.isNotEmpty(param.getDeployMode())) {
|
|
|
+ deployMode = param.getDeployMode();
|
|
|
|
|
|
- args.add(Constants.FLINK_RUN_MODE);
|
|
|
+ }
|
|
|
+ if (!"local".equals(deployMode)) {
|
|
|
+ args.add(Constants.FLINK_RUN_MODE);
|
|
|
|
|
|
- args.add(Constants.FLINK_YARN_CLUSTER);
|
|
|
+ args.add(Constants.FLINK_YARN_CLUSTER);
|
|
|
|
|
|
- if (param.getSlot() != 0) {
|
|
|
- args.add(Constants.FLINK_YARN_SLOT);
|
|
|
- args.add(String.format("%d", param.getSlot()));
|
|
|
- }
|
|
|
|
|
|
- if (StringUtils.isNotEmpty(param.getAppName())) {
|
|
|
- args.add(Constants.FLINK_APP_NAME);
|
|
|
- args.add(param.getAppName());
|
|
|
- }
|
|
|
+ if (param.getSlot() != 0) {
|
|
|
+ args.add(Constants.FLINK_YARN_SLOT);
|
|
|
+ args.add(String.format("%d", param.getSlot()));
|
|
|
+ }
|
|
|
|
|
|
- if (param.getTaskManager() != 0) {
|
|
|
- args.add(Constants.FLINK_TASK_MANAGE);
|
|
|
- args.add(String.format("%d", param.getTaskManager()));
|
|
|
- }
|
|
|
+ if (StringUtils.isNotEmpty(param.getAppName())) {
|
|
|
+ args.add(Constants.FLINK_APP_NAME);
|
|
|
+ args.add(param.getAppName());
|
|
|
+ }
|
|
|
|
|
|
- if (StringUtils.isNotEmpty(param.getJobManagerMemory())) {
|
|
|
- args.add(Constants.FLINK_JOB_MANAGE_MEM);
|
|
|
- args.add(param.getJobManagerMemory());
|
|
|
- }
|
|
|
+ if (param.getTaskManager() != 0) {
|
|
|
+ args.add(Constants.FLINK_TASK_MANAGE);
|
|
|
+ args.add(String.format("%d", param.getTaskManager()));
|
|
|
+ }
|
|
|
|
|
|
- if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) {
|
|
|
- args.add(Constants.FLINK_TASK_MANAGE_MEM);
|
|
|
- args.add(param.getTaskManagerMemory());
|
|
|
- }
|
|
|
- args.add(Constants.FLINK_detach);
|
|
|
+ if (StringUtils.isNotEmpty(param.getJobManagerMemory())) {
|
|
|
+ args.add(Constants.FLINK_JOB_MANAGE_MEM);
|
|
|
+ args.add(param.getJobManagerMemory());
|
|
|
+ }
|
|
|
|
|
|
+ if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) {
|
|
|
+ args.add(Constants.FLINK_TASK_MANAGE_MEM);
|
|
|
+ args.add(param.getTaskManagerMemory());
|
|
|
+ }
|
|
|
+
|
|
|
+ args.add(Constants.FLINK_detach);
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
|
|
|
- if(param.getProgramType() !=null ){
|
|
|
- if(param.getProgramType()!=ProgramType.PYTHON){
|
|
|
+ if (param.getProgramType() != null) {
|
|
|
+ if (param.getProgramType() != ProgramType.PYTHON) {
|
|
|
if (StringUtils.isNotEmpty(param.getMainClass())) {
|
|
|
args.add(Constants.FLINK_MAIN_CLASS);
|
|
|
args.add(param.getMainClass());
|
|
@@ -83,28 +96,29 @@ public class FlinkArgsUtils {
|
|
|
args.add(param.getMainJar().getRes());
|
|
|
}
|
|
|
|
|
|
+ if (StringUtils.isNotEmpty(param.getMainArgs())) {
|
|
|
+ args.add(param.getMainArgs());
|
|
|
+ }
|
|
|
|
|
|
|
|
|
- if (StringUtils.isNotEmpty(param.getOthers())) {
|
|
|
+ if (StringUtils.isNotEmpty(param.getOthers())) {
|
|
|
String others = param.getOthers();
|
|
|
- if(!others.contains("--queue")){
|
|
|
- if (StringUtils.isNotEmpty(param.getQueue())) {
|
|
|
- args.add(Constants.SPARK_QUEUE);
|
|
|
+ if (!others.contains("--qu")) {
|
|
|
+ if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) {
|
|
|
+ args.add(Constants.FLINK_QUEUE);
|
|
|
args.add(param.getQueue());
|
|
|
}
|
|
|
}
|
|
|
args.add(param.getOthers());
|
|
|
- }else if (StringUtils.isNotEmpty(param.getQueue())) {
|
|
|
- args.add(Constants.SPARK_QUEUE);
|
|
|
+ } else if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) {
|
|
|
+ args.add(Constants.FLINK_QUEUE);
|
|
|
args.add(param.getQueue());
|
|
|
|
|
|
}
|
|
|
|
|
|
- if (StringUtils.isNotEmpty(param.getMainArgs())) {
|
|
|
- args.add(param.getMainArgs());
|
|
|
- }
|
|
|
|
|
|
return args;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
}
|