Przeglądaj źródła

[bug][plugin]Fix: Correct the way to determine the yarn queue in Flink CommandLine and SQL mode (#14237)

* Fix: Correct the way to determine the yarn queue in Flink CommandLine

* fix the yarn queue in sql mode && refine the code

* refine code

* remove unnecessary comment

* fix yarn queue properties

* remove redundant variable
ORuteMa 1 rok temu
rodzic
commit
de2cc0e235

+ 37 - 13
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java

@@ -164,12 +164,9 @@ public class FlinkArgsUtils {
             }
 
             // yarn.application.queue
-            String others = flinkParameters.getOthers();
-            if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
-                String queue = flinkParameters.getQueue();
-                if (StringUtils.isNotEmpty(queue)) {
-                    initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue));
-                }
+            String queue = flinkParameters.getQueue();
+            if (StringUtils.isNotEmpty(queue)) {
+                initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue));
             }
         }
 
@@ -254,13 +251,6 @@ public class FlinkArgsUtils {
                     args.add(taskManagerMemory);
                 }
 
-                if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
-                    String queue = flinkParameters.getQueue();
-                    if (StringUtils.isNotEmpty(queue)) { // -yqu
-                        args.add(FlinkConstants.FLINK_QUEUE);
-                        args.add(queue);
-                    }
-                }
                 break;
             case LOCAL:
                 break;
@@ -306,6 +296,40 @@ public class FlinkArgsUtils {
             args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap)));
         }
 
+        // determine yarn queue
+        determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
         return args;
     }
+
+    private static void determinedYarnQueue(List<String> args, FlinkParameters flinkParameters,
+                                            FlinkDeployMode deployMode, String flinkVersion) {
+        switch (deployMode) {
+            case CLUSTER:
+                if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)
+                        || FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
+                    doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
+                } else {
+                    doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_MODE);
+                }
+            case APPLICATION:
+                doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
+        }
+    }
+
+    private static void doAddQueue(List<String> args, FlinkParameters flinkParameters, String option) {
+        String others = flinkParameters.getOthers();
+        if (StringUtils.isEmpty(others) || !others.contains(option)) {
+            String queue = flinkParameters.getQueue();
+            if (StringUtils.isNotEmpty(queue)) {
+                switch (option) {
+                    case FlinkConstants.FLINK_QUEUE_FOR_TARGETS:
+                        args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue));
+                    case FlinkConstants.FLINK_QUEUE_FOR_MODE:
+                        args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE);
+                        args.add(queue);
+                }
+            }
+        }
+    }
+
 }

+ 2 - 1
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

@@ -48,7 +48,8 @@ public class FlinkConstants {
     public static final String FLINK_EXECUTION_TARGET = "-t";
     public static final String FLINK_YARN_SLOT = "-ys";
     public static final String FLINK_APP_NAME = "-ynm";
-    public static final String FLINK_QUEUE = "-yqu";
+    public static final String FLINK_QUEUE_FOR_MODE = "-yqu";
+    public static final String FLINK_QUEUE_FOR_TARGETS = "-Dyarn.application.queue";
     public static final String FLINK_TASK_MANAGE = "-yn";
     public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
     public static final String FLINK_TASK_MANAGE_MEM = "-ytm";