Bläddra i källkod

[Feature][Flink] Support parallelism input (#4937)

* [Feature][Flink] Support parallelism input

* [Feature][Flink] Fix check style

* [Feature][Flink] Remove FLINK_DETACH
Shiwen Cheng 4 år sedan
förälder
incheckning
24fda3fbb1

+ 5 - 2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@@ -842,7 +842,9 @@ public final class Constants {
      */
     public static final String HIVE_CONF = "hiveconf:";
 
-    //flink ??
+    /**
+     * flink
+     */
     public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
     public static final String FLINK_RUN_MODE = "-m";
     public static final String FLINK_YARN_SLOT = "-ys";
@@ -852,8 +854,9 @@ public final class Constants {
 
     public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
     public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
-    public static final String FLINK_DETACH = "-d";
     public static final String FLINK_MAIN_CLASS = "-c";
+    public static final String FLINK_PARALLELISM = "-p";
+    public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
 
 
     public static final int[] NOT_TERMINATED_STATES = new int[] {

+ 199 - 187
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java

@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.task.flink;
 
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -29,201 +29,213 @@ import java.util.List;
  */
 public class FlinkParameters extends AbstractParameters {
 
-  /**
-   * major jar
-   */
-  private ResourceInfo mainJar;
-
-  /**
-   * major class
-   */
-  private String mainClass;
-
-  /**
-   * deploy mode  yarn-cluster  yarn-client  yarn-local
-    */
-  private String deployMode;
-
-  /**
-   * arguments
-   */
-  private String mainArgs;
-
-  /**
-   * slot count
-   */
-  private int slot;
-
-  /**
-   *Yarn application name
-   */
-
-  private String appName;
-
-  /**
-   * taskManager count
-   */
-  private int  taskManager;
-
-  /**
-   * job manager memory
-   */
-  private String  jobManagerMemory ;
-
-  /**
-   * task manager memory
-   */
-  private String  taskManagerMemory;
-
-  /**
-   * resource list
-   */
-  private List<ResourceInfo> resourceList = new ArrayList<>();
-
-  /**
-   * The YARN queue to submit to
-   */
-  private String queue;
-
-  /**
-   * other arguments
-   */
-  private String others;
-
-  /**
-   * flink version
-   */
-  private String flinkVersion;
-
-  /**
-   * program type
-   * 0 JAVA,1 SCALA,2 PYTHON
-   */
-  private ProgramType programType;
-
-  public ResourceInfo getMainJar() {
-    return mainJar;
-  }
-
-  public void setMainJar(ResourceInfo mainJar) {
-    this.mainJar = mainJar;
-  }
-
-  public String getMainClass() {
-    return mainClass;
-  }
-
-  public void setMainClass(String mainClass) {
-    this.mainClass = mainClass;
-  }
-
-  public String getDeployMode() {
-    return deployMode;
-  }
-
-  public void setDeployMode(String deployMode) {
-    this.deployMode = deployMode;
-  }
-
-  public String getMainArgs() {
-    return mainArgs;
-  }
-
-  public void setMainArgs(String mainArgs) {
-    this.mainArgs = mainArgs;
-  }
-
-  public int getSlot() {
-    return slot;
-  }
-
-  public void setSlot(int slot) {
-    this.slot = slot;
-  }
-
-  public String getAppName() {
-    return appName;
-  }
-
-  public void setAppName(String appName) {
-    this.appName = appName;
-  }
-
-  public int getTaskManager() {
-    return taskManager;
-  }
-
-  public void setTaskManager(int taskManager) {
-    this.taskManager = taskManager;
-  }
-
-  public String getJobManagerMemory() {
-    return jobManagerMemory;
-  }
-
-  public void setJobManagerMemory(String jobManagerMemory) {
-    this.jobManagerMemory = jobManagerMemory;
-  }
-
-  public String getTaskManagerMemory() {
-    return taskManagerMemory;
-  }
-
-  public void setTaskManagerMemory(String taskManagerMemory) {
-    this.taskManagerMemory = taskManagerMemory;
-  }
-
-  public String getQueue() {
-    return queue;
-  }
-
-  public void setQueue(String queue) {
-    this.queue = queue;
-  }
-
-  public List<ResourceInfo> getResourceList() {
-    return resourceList;
-  }
+    /**
+     * major jar
+     */
+    private ResourceInfo mainJar;
+
+    /**
+     * major class
+     */
+    private String mainClass;
+
+    /**
+     * deploy mode  yarn-cluster yarn-local
+     */
+    private String deployMode;
+
+    /**
+     * arguments
+     */
+    private String mainArgs;
+
+    /**
+     * slot count
+     */
+    private int slot;
+
+    /**
+     * parallelism
+     */
+    private int parallelism;
+
+    /**
+     * yarn application name
+     */
+    private String appName;
+
+    /**
+     * taskManager count
+     */
+    private int taskManager;
+
+    /**
+     * job manager memory
+     */
+    private String jobManagerMemory;
+
+    /**
+     * task manager memory
+     */
+    private String taskManagerMemory;
+
+    /**
+     * resource list
+     */
+    private List<ResourceInfo> resourceList = new ArrayList<>();
+
+    /**
+     * The YARN queue to submit to
+     */
+    private String queue;
+
+    /**
+     * other arguments
+     */
+    private String others;
+
+    /**
+     * flink version
+     */
+    private String flinkVersion;
+
+    /**
+     * program type
+     * 0 JAVA,1 SCALA,2 PYTHON
+     */
+    private ProgramType programType;
+
+    public ResourceInfo getMainJar() {
+        return mainJar;
+    }
+
+    public void setMainJar(ResourceInfo mainJar) {
+        this.mainJar = mainJar;
+    }
+
+    public String getMainClass() {
+        return mainClass;
+    }
+
+    public void setMainClass(String mainClass) {
+        this.mainClass = mainClass;
+    }
+
+    public String getDeployMode() {
+        return deployMode;
+    }
+
+    public void setDeployMode(String deployMode) {
+        this.deployMode = deployMode;
+    }
+
+    public String getMainArgs() {
+        return mainArgs;
+    }
+
+    public void setMainArgs(String mainArgs) {
+        this.mainArgs = mainArgs;
+    }
+
+    public int getSlot() {
+        return slot;
+    }
+
+    public void setSlot(int slot) {
+        this.slot = slot;
+    }
+
+    public int getParallelism() {
+        return parallelism;
+    }
+
+    public void setParallelism(int parallelism) {
+        this.parallelism = parallelism;
+    }
+
+    public String getAppName() {
+        return appName;
+    }
+
+    public void setAppName(String appName) {
+        this.appName = appName;
+    }
+
+    public int getTaskManager() {
+        return taskManager;
+    }
 
-  public void setResourceList(List<ResourceInfo> resourceList) {
-    this.resourceList = resourceList;
-  }
+    public void setTaskManager(int taskManager) {
+        this.taskManager = taskManager;
+    }
 
-  public String getOthers() {
-    return others;
-  }
+    public String getJobManagerMemory() {
+        return jobManagerMemory;
+    }
+
+    public void setJobManagerMemory(String jobManagerMemory) {
+        this.jobManagerMemory = jobManagerMemory;
+    }
 
-  public void setOthers(String others) {
-    this.others = others;
-  }
+    public String getTaskManagerMemory() {
+        return taskManagerMemory;
+    }
 
-  public ProgramType getProgramType() {
-    return programType;
-  }
+    public void setTaskManagerMemory(String taskManagerMemory) {
+        this.taskManagerMemory = taskManagerMemory;
+    }
 
-  public void setProgramType(ProgramType programType) {
-    this.programType = programType;
-  }
+    public String getQueue() {
+        return queue;
+    }
 
-  public String getFlinkVersion() {
-    return flinkVersion;
-  }
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
 
-  public void setFlinkVersion(String flinkVersion) {
-    this.flinkVersion = flinkVersion;
-  }
+    public List<ResourceInfo> getResourceList() {
+        return resourceList;
+    }
 
-  @Override
-  public boolean checkParameters() {
-    return mainJar != null && programType != null;
-  }
+    public void setResourceList(List<ResourceInfo> resourceList) {
+        this.resourceList = resourceList;
+    }
 
+    public String getOthers() {
+        return others;
+    }
 
-  @Override
-  public List<ResourceInfo> getResourceFilesList() {
-    if (mainJar != null && !resourceList.contains(mainJar)) {
-      resourceList.add(mainJar);
+    public void setOthers(String others) {
+        this.others = others;
     }
-    return resourceList;
-  }
+
+    public ProgramType getProgramType() {
+        return programType;
+    }
+
+    public void setProgramType(ProgramType programType) {
+        this.programType = programType;
+    }
+
+    public String getFlinkVersion() {
+        return flinkVersion;
+    }
+
+    public void setFlinkVersion(String flinkVersion) {
+        this.flinkVersion = flinkVersion;
+    }
+
+    @Override
+    public boolean checkParameters() {
+        return mainJar != null && programType != null;
+    }
+
+    @Override
+    public List<ResourceInfo> getResourceFilesList() {
+        if (mainJar != null && !resourceList.contains(mainJar)) {
+            resourceList.add(mainJar);
+        }
+        return resourceList;
+    }
+
 }

+ 13 - 6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@@ -17,11 +17,11 @@
 
 package org.apache.dolphinscheduler.server.utils;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -53,7 +53,7 @@ public class FlinkArgsUtils {
             args.add(Constants.FLINK_YARN_CLUSTER);   //yarn-cluster
 
             int slot = param.getSlot();
-            if (slot != 0) {
+            if (slot > 0) {
                 args.add(Constants.FLINK_YARN_SLOT);
                 args.add(String.format("%d", slot));   //-ys
             }
@@ -68,7 +68,7 @@ public class FlinkArgsUtils {
             String flinkVersion = param.getFlinkVersion();
             if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
                 int taskManager = param.getTaskManager();
-                if (taskManager != 0) {                        //-yn
+                if (taskManager > 0) {                        //-yn
                     args.add(Constants.FLINK_TASK_MANAGE);
                     args.add(String.format("%d", taskManager));
                 }
@@ -92,12 +92,19 @@ public class FlinkArgsUtils {
                     args.add(queue);
                 }
             }
+        }
 
-            args.add(Constants.FLINK_DETACH); //-d
-
+        int parallelism = param.getParallelism();
+        if (parallelism > 0) {
+            args.add(Constants.FLINK_PARALLELISM);
+            args.add(String.format("%d", parallelism));   // -p
         }
 
-        // -p -s -yqu -yat -sae -yD -D
+        // If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly
+        // The task status will be synchronized with the cluster job status
+        args.add(Constants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
+
+        // -s -yqu -yat -yD -D
         if (StringUtils.isNotEmpty(others)) {
             args.add(others);
         }

+ 19 - 13
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@@ -17,19 +17,20 @@
 
 package org.apache.dolphinscheduler.server.utils;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
+
+import java.util.List;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
 /**
  * Test FlinkArgsUtils
  */
@@ -39,6 +40,7 @@ public class FlinkArgsUtilsTest {
 
     public String mode = "cluster";
     public int slot = 2;
+    public int parallelism = 3;
     public String appName = "testFlink";
     public int taskManager = 4;
     public String taskManagerMemory = "2G";
@@ -48,7 +50,7 @@ public class FlinkArgsUtilsTest {
     public ResourceInfo mainJar = null;
     public String mainArgs = "testArgs --input file:///home";
     public String queue = "queue1";
-    public String others = "-p 4";
+    public String others = "-s hdfs:///flink/savepoint-1537";
     public String flinkVersion = "<1.10";
 
 
@@ -72,6 +74,7 @@ public class FlinkArgsUtilsTest {
         param.setMainClass(mainClass);
         param.setAppName(appName);
         param.setSlot(slot);
+        param.setParallelism(parallelism);
         param.setTaskManager(taskManager);
         param.setJobManagerMemory(jobManagerMemory);
         param.setTaskManagerMemory(taskManagerMemory);
@@ -89,7 +92,7 @@ public class FlinkArgsUtilsTest {
         }
 
         //Expected values and order
-        assertEquals(20, result.size());
+        assertEquals(22, result.size());
 
         assertEquals("-m", result.get(0));
         assertEquals("yarn-cluster", result.get(1));
@@ -112,15 +115,18 @@ public class FlinkArgsUtilsTest {
         assertEquals("-yqu", result.get(12));
         assertEquals(result.get(13),queue);
 
-        assertEquals("-d", result.get(14));
+        assertEquals("-p", result.get(14));
+        assertSame(Integer.valueOf(result.get(15)),parallelism);
+
+        assertEquals("-sae", result.get(16));
 
-        assertEquals(result.get(15),others);
+        assertEquals(result.get(17),others);
 
-        assertEquals("-c", result.get(16));
-        assertEquals(result.get(17),mainClass);
+        assertEquals("-c", result.get(18));
+        assertEquals(result.get(19),mainClass);
 
-        assertEquals(result.get(18),mainJar.getRes());
-        assertEquals(result.get(19),mainArgs);
+        assertEquals(result.get(20),mainJar.getRes());
+        assertEquals(result.get(21),mainArgs);
 
         //Others param without -yqu
         FlinkParameters param1 = new FlinkParameters();

+ 24 - 2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue

@@ -80,7 +80,7 @@
         </el-select>
       </div>
     </m-list-box>
-    <m-list-4-box v-if="deployMode === 'cluster'">
+    <m-list-box v-if="deployMode === 'cluster'">
       <div slot="text">{{$t('App Name')}}</div>
       <div slot="content">
         <el-input
@@ -91,7 +91,7 @@
           :placeholder="$t('Please enter app name(optional)')">
         </el-input>
       </div>
-    </m-list-4-box>
+    </m-list-box>
     <m-list-4-box v-if="deployMode === 'cluster'">
       <div slot="text">{{$t('JobManager Memory')}}</div>
       <div slot="content">
@@ -136,6 +136,18 @@
         </el-input>
       </div>
     </m-list-4-box>
+    <m-list-4-box>
+      <div slot="text">{{$t('Parallelism')}}</div>
+      <div slot="content">
+        <el-input
+          :disabled="isDetails"
+          type="input"
+          size="small"
+          v-model="parallelism"
+          :placeholder="$t('Please enter Parallelism')">
+        </el-input>
+      </div>
+    </m-list-4-box>
     <m-list-box>
       <div slot="text">{{$t('Main Arguments')}}</div>
       <div slot="content">
@@ -215,6 +227,8 @@
         localParams: [],
         // Slot number
         slot: 1,
+        // Parallelism
+        parallelism: 1,
         // TaskManager mumber
         taskManager: '2',
         // JobManager memory
@@ -320,6 +334,11 @@
           return false
         }
 
+        if (!Number.isInteger(parseInt(this.parallelism))) {
+          this.$message.warning(`${i18n.$t('Please enter Parallelism')}`)
+          return false
+        }
+
         if (this.flinkVersion === '<1.10' && !Number.isInteger(parseInt(this.taskManager))) {
           this.$message.warning(`${i18n.$t('Please enter TaskManager number')}`)
           return false
@@ -349,6 +368,7 @@
           localParams: this.localParams,
           flinkVersion: this.flinkVersion,
           slot: this.slot,
+          parallelism: this.parallelism,
           taskManager: this.taskManager,
           jobManagerMemory: this.jobManagerMemory,
           taskManagerMemory: this.taskManagerMemory,
@@ -485,6 +505,7 @@
           resourceList: this.resourceIdArr,
           localParams: this.localParams,
           slot: this.slot,
+          parallelism: this.parallelism,
           taskManager: this.taskManager,
           jobManagerMemory: this.jobManagerMemory,
           taskManagerMemory: this.taskManagerMemory,
@@ -516,6 +537,7 @@
         this.deployMode = o.params.deployMode || ''
         this.flinkVersion = o.params.flinkVersion || '<1.10'
         this.slot = o.params.slot || 1
+        this.parallelism = o.params.parallelism || 1
         this.taskManager = o.params.taskManager || '2'
         this.jobManagerMemory = o.params.jobManagerMemory || '1G'
         this.taskManagerMemory = o.params.taskManagerMemory || '2G'

+ 2 - 0
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@@ -123,6 +123,8 @@ export default {
   'Please enter TaskManager memory': 'Please enter TaskManager memory',
   'Slot Number': 'Slot Number',
   'Please enter Slot number': 'Please enter Slot number',
+  Parallelism: 'Parallelism',
+  'Please enter Parallelism': 'Please enter Parallelism',
   'TaskManager Number': 'TaskManager Number',
   'Please enter TaskManager number': 'Please enter TaskManager number',
   'App Name': 'App Name',

+ 2 - 0
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@@ -123,6 +123,8 @@ export default {
   'Please enter TaskManager memory': '请输入TaskManager内存数',
   'Slot Number': 'Slot数量',
   'Please enter Slot number': '请输入Slot数量',
+  Parallelism: '并行度',
+  'Please enter Parallelism': '请输入并行度',
   'TaskManager Number': 'TaskManager数量',
   'Please enter TaskManager number': '请输入TaskManager数量',
   'App Name': '任务名称',