Browse Source

flink task support(flink 任务支持) (#711)

* flink任务支持

* flink任务支持

* Update zh_CN.js

* Update FlinkArgsUtils.java

* Update .escheduler_env.sh
Love EvenWong 5 years ago
parent
commit
02598a7b45

+ 14 - 0
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@@ -906,4 +906,18 @@ public final class Constants {
      * hive conf
      */
     public static final String HIVE_CONF = "hiveconf:";
+
+    //flink 任务
+    public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
+    public static final String FLINK_RUN_MODE = "-m";
+    public static final String FLINK_YARN_SLOT = "-ys";
+    public static final String FLINK_APP_NAME = "-ynm";
+    public static final String FLINK_TASK_MANAGE = "-yn";
+
+    public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
+    public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
+    public static final String FLINK_detach = "-d";
+    public static final String FLINK_MAIN_CLASS = "-c";
+
+
 }

+ 2 - 1
escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java

@@ -29,8 +29,9 @@ public enum TaskType {
      * 5 SPARK
      * 6 PYTHON
      * 7 DEPENDENT
+     * 8 FLINK
      */
-    SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT;
+    SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT,FLINK;
 
     public static boolean typeIsNormalTask(String typeName) {
         TaskType taskType = TaskType.valueOf(typeName);

+ 219 - 0
escheduler-common/src/main/java/cn/escheduler/common/task/flink/FlinkParameters.java

@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cn.escheduler.common.task.flink;
+
+import cn.escheduler.common.enums.ProgramType;
+import cn.escheduler.common.process.ResourceInfo;
+import cn.escheduler.common.task.AbstractParameters;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * spark parameters
+ */
+public class FlinkParameters extends AbstractParameters {
+
+  /**
+   * major jar
+   */
+  private ResourceInfo mainJar;
+
+  /**
+   * major class
+   */
+  private String mainClass;
+
+  /**
+   * deploy mode  yarn-cluster  yarn-client  yarn-local
+    */
+  private String deployMode;
+
+  /**
+   * arguments
+   */
+  private String mainArgs;
+
+  /**
+   * slot个数
+   */
+  private int slot;
+
+  /**
+   *Yarn application的名字
+   */
+
+  private String appName;
+
+  /**
+   * taskManager 数量
+   */
+  private int  taskManager;
+
+  /**
+   * jobManagerMemory 内存大小
+   */
+  private String  jobManagerMemory ;
+
+  /**
+   * taskManagerMemory内存大小
+   */
+  private String  taskManagerMemory;
+
+  /**
+   * resource list
+   */
+  private List<ResourceInfo> resourceList;
+
+  /**
+   * The YARN queue to submit to
+   */
+  private String queue;
+
+  /**
+   * other arguments
+   */
+  private String others;
+
+  /**
+   * program type
+   * 0 JAVA,1 SCALA,2 PYTHON
+   */
+  private ProgramType programType;
+
+  public ResourceInfo getMainJar() {
+    return mainJar;
+  }
+
+  public void setMainJar(ResourceInfo mainJar) {
+    this.mainJar = mainJar;
+  }
+
+  public String getMainClass() {
+    return mainClass;
+  }
+
+  public void setMainClass(String mainClass) {
+    this.mainClass = mainClass;
+  }
+
+  public String getDeployMode() {
+    return deployMode;
+  }
+
+  public void setDeployMode(String deployMode) {
+    this.deployMode = deployMode;
+  }
+
+  public String getMainArgs() {
+    return mainArgs;
+  }
+
+  public void setMainArgs(String mainArgs) {
+    this.mainArgs = mainArgs;
+  }
+
+  public int getSlot() {
+    return slot;
+  }
+
+  public void setSlot(int slot) {
+    this.slot = slot;
+  }
+
+  public String getAppName() {
+    return appName;
+  }
+
+  public void setAppName(String appName) {
+    this.appName = appName;
+  }
+
+  public int getTaskManager() {
+    return taskManager;
+  }
+
+  public void setTaskManager(int taskManager) {
+    this.taskManager = taskManager;
+  }
+
+  public String getJobManagerMemory() {
+    return jobManagerMemory;
+  }
+
+  public void setJobManagerMemory(String jobManagerMemory) {
+    this.jobManagerMemory = jobManagerMemory;
+  }
+
+  public String getTaskManagerMemory() {
+    return taskManagerMemory;
+  }
+
+  public void setTaskManagerMemory(String taskManagerMemory) {
+    this.taskManagerMemory = taskManagerMemory;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public void setQueue(String queue) {
+    this.queue = queue;
+  }
+
+  public List<ResourceInfo> getResourceList() {
+    return resourceList;
+  }
+
+  public void setResourceList(List<ResourceInfo> resourceList) {
+    this.resourceList = resourceList;
+  }
+
+  public String getOthers() {
+    return others;
+  }
+
+  public void setOthers(String others) {
+    this.others = others;
+  }
+
+  public ProgramType getProgramType() {
+    return programType;
+  }
+
+  public void setProgramType(ProgramType programType) {
+    this.programType = programType;
+  }
+
+  @Override
+  public boolean checkParameters() {
+    return mainJar != null && programType != null;
+  }
+
+
+  @Override
+  public List<String> getResourceFilesList() {
+    if(resourceList !=null ) {
+      this.resourceList.add(mainJar);
+      return resourceList.stream()
+              .map(p -> p.getRes()).collect(Collectors.toList());
+    }
+    return null;
+  }
+
+
+}

+ 3 - 0
escheduler-common/src/main/java/cn/escheduler/common/utils/TaskParametersUtils.java

@@ -19,6 +19,7 @@ package cn.escheduler.common.utils;
 import cn.escheduler.common.enums.TaskType;
 import cn.escheduler.common.task.AbstractParameters;
 import cn.escheduler.common.task.dependent.DependentParameters;
+import cn.escheduler.common.task.flink.FlinkParameters;
 import cn.escheduler.common.task.mr.MapreduceParameters;
 import cn.escheduler.common.task.procedure.ProcedureParameters;
 import cn.escheduler.common.task.python.PythonParameters;
@@ -63,6 +64,8 @@ public class TaskParametersUtils {
           return JSONUtils.parseObject(parameter, PythonParameters.class);
         case DEPENDENT:
           return JSONUtils.parseObject(parameter, DependentParameters.class);
+        case FLINK:
+          return JSONUtils.parseObject(parameter, FlinkParameters.class);
         default:
           return null;
       }

+ 110 - 0
escheduler-server/src/main/java/cn/escheduler/server/utils/FlinkArgsUtils.java

@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cn.escheduler.server.utils;
+
+
+import cn.escheduler.common.Constants;
+import cn.escheduler.common.enums.ProgramType;
+import cn.escheduler.common.task.flink.FlinkParameters;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ *  spark args utils
+ */
+public class FlinkArgsUtils {
+
+    /**
+     *  build args
+     * @param param
+     * @return
+     */
+    public static List<String> buildArgs(FlinkParameters param) {
+        List<String> args = new ArrayList<>();
+
+        args.add(Constants.FLINK_RUN_MODE);  //-m
+
+        args.add(Constants.FLINK_YARN_CLUSTER);   //yarn-cluster
+
+        if (param.getSlot() != 0) {
+            args.add(Constants.FLINK_YARN_SLOT);
+            args.add(String.format("%d", param.getSlot()));   //-ys
+        }
+
+        if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm
+            args.add(Constants.FLINK_APP_NAME);
+            args.add(param.getAppName());
+        }
+
+        if (param.getTaskManager() != 0) {                        //-yn
+            args.add(Constants.FLINK_TASK_MANAGE);
+            args.add(String.format("%d", param.getTaskManager()));
+        }
+
+        if (StringUtils.isNotEmpty(param.getJobManagerMemory())) {
+            args.add(Constants.FLINK_JOB_MANAGE_MEM);
+            args.add(param.getJobManagerMemory()); //-yjm
+        }
+
+        if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm
+            args.add(Constants.FLINK_TASK_MANAGE_MEM);
+            args.add(param.getTaskManagerMemory());
+        }
+        args.add(Constants.FLINK_detach); //-d
+
+
+        if(param.getProgramType() !=null ){
+            if(param.getProgramType()!=ProgramType.PYTHON){
+                if (StringUtils.isNotEmpty(param.getMainClass())) {
+                    args.add(Constants.FLINK_MAIN_CLASS);    //-c
+                    args.add(param.getMainClass());          //main class
+                }
+            }
+        }
+
+        if (param.getMainJar() != null) {
+            args.add(param.getMainJar().getRes());
+        }
+
+
+        // --files --conf --libjar ...
+      if (StringUtils.isNotEmpty(param.getOthers())) {
+            String others = param.getOthers();
+            if(!others.contains("--queue")){
+                if (StringUtils.isNotEmpty(param.getQueue())) {
+                    args.add(Constants.SPARK_QUEUE);
+                    args.add(param.getQueue());
+                }
+            }
+            args.add(param.getOthers());
+        }else if (StringUtils.isNotEmpty(param.getQueue())) {
+            args.add(Constants.SPARK_QUEUE);
+            args.add(param.getQueue());
+
+        }
+
+       if (StringUtils.isNotEmpty(param.getMainArgs())) {
+            args.add(param.getMainArgs());
+        }
+
+        return args;
+    }
+
+}

+ 3 - 0
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java

@@ -22,6 +22,7 @@ import cn.escheduler.common.enums.TaskRecordStatus;
 import cn.escheduler.common.enums.TaskType;
 import cn.escheduler.common.process.Property;
 import cn.escheduler.common.task.AbstractParameters;
+import cn.escheduler.common.task.flink.FlinkParameters;
 import cn.escheduler.common.task.mr.MapreduceParameters;
 import cn.escheduler.common.task.procedure.ProcedureParameters;
 import cn.escheduler.common.task.python.PythonParameters;
@@ -178,6 +179,8 @@ public abstract class AbstractTask {
             case SPARK:
                 paramsClass = SparkParameters.class;
                 break;
+            case FLINK:
+                paramsClass = FlinkParameters.class;
             case PYTHON:
                 paramsClass = PythonParameters.class;
                 break;

+ 3 - 0
escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskManager.java

@@ -19,6 +19,7 @@ package cn.escheduler.server.worker.task;
 
 import cn.escheduler.common.enums.TaskType;
 import cn.escheduler.server.worker.task.dependent.DependentTask;
+import cn.escheduler.server.worker.task.flink.FlinkTask;
 import cn.escheduler.server.worker.task.mr.MapReduceTask;
 import cn.escheduler.server.worker.task.processdure.ProcedureTask;
 import cn.escheduler.server.worker.task.python.PythonTask;
@@ -55,6 +56,8 @@ public class TaskManager {
         return new MapReduceTask(props, logger);
       case SPARK:
         return new SparkTask(props, logger);
+      case FLINK:
+        return new FlinkTask(props, logger);
       case PYTHON:
         return new PythonTask(props, logger);
       case DEPENDENT:

+ 118 - 0
escheduler-server/src/main/java/cn/escheduler/server/worker/task/flink/FlinkTask.java

@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cn.escheduler.server.worker.task.flink;
+
+import cn.escheduler.common.process.Property;
+import cn.escheduler.common.task.AbstractParameters;
+import cn.escheduler.common.task.flink.FlinkParameters;
+import cn.escheduler.common.utils.JSONUtils;
+import cn.escheduler.common.utils.ParameterUtils;
+import cn.escheduler.dao.model.ProcessInstance;
+import cn.escheduler.server.utils.FlinkArgsUtils;
+import cn.escheduler.server.utils.ParamUtils;
+import cn.escheduler.server.worker.task.AbstractYarnTask;
+import cn.escheduler.server.worker.task.TaskProps;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *  flink task
+ */
+public class FlinkTask extends AbstractYarnTask {
+
+  /**
+   *  flink command
+   */
+  private static final String FLINK_COMMAND = "flink";
+  private static final String FLINK_RUN = "run";
+
+  /**
+   *  flink parameters
+   */
+  private FlinkParameters flinkParameters;
+
+  public FlinkTask(TaskProps props, Logger logger) {
+    super(props, logger);
+  }
+
+  @Override
+  public void init() {
+
+    logger.info("flink task params {}", taskProps.getTaskParams());
+
+    flinkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), FlinkParameters.class);
+
+    if (!flinkParameters.checkParameters()) {
+      throw new RuntimeException("flink task params is not valid");
+    }
+    flinkParameters.setQueue(taskProps.getQueue());
+
+    if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
+      String args = flinkParameters.getMainArgs();
+      // get process instance by task instance id
+      ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+
+      /**
+       *  combining local and global parameters
+       */
+      Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
+              taskProps.getDefinedParams(),
+              flinkParameters.getLocalParametersMap(),
+              processInstance.getCmdTypeIfComplement(),
+              processInstance.getScheduleTime());
+
+      logger.info("param Map : {}", paramsMap);
+      if (paramsMap != null ){
+
+        args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
+        logger.info("param args : {}", args);
+      }
+      flinkParameters.setMainArgs(args);
+    }
+  }
+
+  /**
+   *  create command
+   * @return
+   */
+  @Override
+  protected String buildCommand() {
+    List<String> args = new ArrayList<>();
+
+    args.add(FLINK_COMMAND);
+    args.add(FLINK_RUN);
+    logger.info("flink task args : {}", args);
+    // other parameters
+    args.addAll(FlinkArgsUtils.buildArgs(flinkParameters));
+
+    String command = ParameterUtils
+            .convertParameterPlaceholders(String.join(" ", args), taskProps.getDefinedParams());
+
+    logger.info("flink task command : {}", command);
+
+    return command;
+  }
+
+  @Override
+  public AbstractParameters getParameters() {
+    return flinkParameters;
+  }
+}

+ 4 - 0
escheduler-ui/src/js/conf/home/pages/dag/_source/config.js

@@ -260,6 +260,10 @@ let tasksType = {
     desc: 'SPARK',
     color: '#E46F13'
   },
+  'FLINK': {
+    desc: 'FLINK',
+    color: '#E46F13'
+  },
   'MR': {
     desc: 'MapReduce',
     color: '#A0A5CC'

+ 3 - 0
escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss

@@ -70,6 +70,9 @@
   .icos-SPARK {
     background: url("../img/toolbar_SPARK.png") no-repeat 50% 50%;
   }
+  .icos-FLINK {
+    background: url("../img/toobar_flink.svg") no-repeat 50% 50%;
+  }
   .icos-MR {
     background: url("../img/toolbar_MR.png") no-repeat 50% 50%;
   }

+ 68 - 60
escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue

@@ -19,13 +19,13 @@
           <div class="cont-box">
             <label class="label-box">
               <x-input
-                      type="text"
-                      v-model="name"
-                      :disabled="isDetails"
-                      :placeholder="$t('Please enter name(required)')"
-                      maxlength="100"
-                      @on-blur="_verifName()"
-                      autocomplete="off">
+                type="text"
+                v-model="name"
+                :disabled="isDetails"
+                :placeholder="$t('Please enter name(required)')"
+                maxlength="100"
+                @on-blur="_verifName()"
+                autocomplete="off">
               </x-input>
             </label>
           </div>
@@ -52,13 +52,13 @@
           <div class="cont-box">
             <label class="label-box">
               <x-input
-                      resize
-                      :autosize="{minRows:2}"
-                      type="textarea"
-                      :disabled="isDetails"
-                      v-model="desc"
-                      :placeholder="$t('Please enter description')"
-                      autocomplete="off">
+                resize
+                :autosize="{minRows:2}"
+                type="textarea"
+                :disabled="isDetails"
+                v-model="desc"
+                :placeholder="$t('Please enter description')"
+                autocomplete="off">
               </x-input>
             </label>
           </div>
@@ -96,68 +96,74 @@
 
         <!-- Task timeout alarm -->
         <m-timeout-alarm
-                ref="timeout"
-                :backfill-item="backfillItem"
-                @on-timeout="_onTimeout">
+          ref="timeout"
+          :backfill-item="backfillItem"
+          @on-timeout="_onTimeout">
         </m-timeout-alarm>
 
         <!-- shell node -->
         <m-shell
-                v-if="taskType === 'SHELL'"
-                @on-params="_onParams"
-                ref="SHELL"
-                :backfill-item="backfillItem">
+          v-if="taskType === 'SHELL'"
+          @on-params="_onParams"
+          ref="SHELL"
+          :backfill-item="backfillItem">
         </m-shell>
         <!-- sub_process node -->
         <m-sub-process
-                v-if="taskType === 'SUB_PROCESS'"
-                @on-params="_onParams"
-                @on-set-process-name="_onSetProcessName"
-                ref="SUB_PROCESS"
-                :backfill-item="backfillItem">
+          v-if="taskType === 'SUB_PROCESS'"
+          @on-params="_onParams"
+          @on-set-process-name="_onSetProcessName"
+          ref="SUB_PROCESS"
+          :backfill-item="backfillItem">
         </m-sub-process>
         <!-- procedure node -->
         <m-procedure
-                v-if="taskType === 'PROCEDURE'"
-                @on-params="_onParams"
-                ref="PROCEDURE"
-                :backfill-item="backfillItem">
+          v-if="taskType === 'PROCEDURE'"
+          @on-params="_onParams"
+          ref="PROCEDURE"
+          :backfill-item="backfillItem">
         </m-procedure>
         <!-- sql node -->
         <m-sql
-                v-if="taskType === 'SQL'"
-                @on-params="_onParams"
-                ref="SQL"
-                :create-node-id="id"
-                :backfill-item="backfillItem">
+          v-if="taskType === 'SQL'"
+          @on-params="_onParams"
+          ref="SQL"
+          :create-node-id="id"
+          :backfill-item="backfillItem">
         </m-sql>
         <!-- spark node -->
         <m-spark
-                v-if="taskType === 'SPARK'"
-                @on-params="_onParams"
-                ref="SPARK"
-                :backfill-item="backfillItem">
+          v-if="taskType === 'SPARK'"
+          @on-params="_onParams"
+          ref="SPARK"
+          :backfill-item="backfillItem">
         </m-spark>
+        <m-flink
+          v-if="taskType === 'FLINK'"
+          @on-params="_onParams"
+          ref="FLINK"
+          :backfill-item="backfillItem">
+        </m-flink>
         <!-- mr node -->
         <m-mr
-                v-if="taskType === 'MR'"
-                @on-params="_onParams"
-                ref="MR"
-                :backfill-item="backfillItem">
+          v-if="taskType === 'MR'"
+          @on-params="_onParams"
+          ref="MR"
+          :backfill-item="backfillItem">
         </m-mr>
         <!-- python node -->
         <m-python
-                v-if="taskType === 'PYTHON'"
-                @on-params="_onParams"
-                ref="PYTHON"
-                :backfill-item="backfillItem">
+          v-if="taskType === 'PYTHON'"
+          @on-params="_onParams"
+          ref="PYTHON"
+          :backfill-item="backfillItem">
         </m-python>
         <!-- dependent node -->
         <m-dependent
-                v-if="taskType === 'DEPENDENT'"
-                @on-dependent="_onDependent"
-                ref="DEPENDENT"
-                :backfill-item="backfillItem">
+          v-if="taskType === 'DEPENDENT'"
+          @on-dependent="_onDependent"
+          ref="DEPENDENT"
+          :backfill-item="backfillItem">
         </m-dependent>
 
       </div>
@@ -178,6 +184,7 @@
   import i18n from '@/module/i18n'
   import mShell from './tasks/shell'
   import mSpark from './tasks/spark'
+  import mFlink from './tasks/flink'
   import mPython from './tasks/python'
   import JSP from './../plugIn/jsPlumbHandle'
   import mProcedure from './tasks/procedure'
@@ -284,12 +291,12 @@
           }
           this.store.dispatch('dag/getSubProcessId', { taskId: stateId }).then(res => {
             this.$emit('onSubProcess', {
-              subProcessId: res.data.subProcessInstanceId,
-              fromThis: this
-            })
-          }).catch(e => {
-            this.$message.error(e.msg || '')
+            subProcessId: res.data.subProcessInstanceId,
+            fromThis: this
           })
+        }).catch(e => {
+            this.$message.error(e.msg || '')
+        })
         } else {
           this.$emit('onSubProcess', {
             subProcessId: this.backfillItem.params.processDefinitionId,
@@ -413,10 +420,10 @@
       if (taskList.length) {
         taskList.forEach(v => {
           if (v.id === this.id) {
-            o = v
-            this.backfillItem = v
-          }
-        })
+          o = v
+          this.backfillItem = v
+        }
+      })
         // Non-null objects represent backfill
         if (!_.isEmpty(o)) {
           this.name = o.name
@@ -455,6 +462,7 @@
       mSql,
       mLog,
       mSpark,
+      mFlink,
       mPython,
       mDependent,
       mSelectInput,

+ 388 - 0
escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue

@@ -0,0 +1,388 @@
+<template>
+  <div class="flink-model">
+    <m-list-box>
+      <div slot="text">{{$t('Program Type')}}</div>
+      <div slot="content">
+        <x-select
+                style="width: 130px;"
+                v-model="programType"
+                :disabled="isDetails">
+          <x-option
+                  v-for="city in programTypeList"
+                  :key="city.code"
+                  :value="city.code"
+                  :label="city.code">
+          </x-option>
+        </x-select>
+      </div>
+    </m-list-box>
+
+    <m-list-box v-if="programType !== 'PYTHON'">
+      <div slot="text">{{$t('Main class')}}</div>
+      <div slot="content">
+        <x-input
+                :disabled="isDetails"
+                type="input"
+                v-model="mainClass"
+                :placeholder="$t('Please enter main class')"
+                autocomplete="off">
+        </x-input>
+      </div>
+    </m-list-box>
+    <m-list-box>
+      <div slot="text">{{$t('Main jar package')}}</div>
+      <div slot="content">
+        <x-select
+                style="width: 100%;"
+                :placeholder="$t('Please enter main jar package')"
+                v-model="mainJar"
+                filterable
+                :disabled="isDetails">
+          <x-option
+                  v-for="city in mainJarList"
+                  :key="city.code"
+                  :value="city.code"
+                  :label="city.code">
+          </x-option>
+        </x-select>
+      </div>
+    </m-list-box>
+    <m-list-box>
+      <div slot="text">{{$t('Deploy Mode')}}</div>
+      <div slot="content">
+        <x-radio-group v-model="deployMode">
+          <x-radio :label="'cluster'" :disabled="isDetails"></x-radio>
+        </x-radio-group>
+      </div>
+    </m-list-box>
+    <div class="list-box-4p">
+      <div class="clearfix list">
+        <span class="sp1">{{$t('slot')}}</span>
+        <span class="sp2">
+          <x-input
+                  :disabled="isDetails"
+                  type="input"
+                  v-model="slot"
+                  :placeholder="$t('Please enter driver core number')"
+                  style="width: 200px;"
+                  autocomplete="off">
+        </x-input>
+        </span>
+        <span class="sp1 sp3">{{$t('taskManager')}}</span>
+        <span class="sp2">
+          <x-input
+                  :disabled="isDetails"
+                  type="input"
+                  v-model="taskManager"
+                  :placeholder="$t('Please enter driver memory use')"
+                  style="width: 186px;"
+                  autocomplete="off">
+        </x-input>
+        </span>
+      </div>
+      <div class="clearfix list">
+        <span class="sp1">{{$t('jobManagerMemory')}}</span>
+        <span class="sp2">
+          <x-input
+                  :disabled="isDetails"
+                  type="input"
+                  v-model="jobManagerMemory"
+                  :placeholder="$t('Please enter the number of Executor')"
+                  style="width: 200px;"
+                  autocomplete="off">
+        </x-input>
+        </span>
+        <span class="sp1 sp3">{{$t('taskManagerMemory')}}</span>
+        <span class="sp2">
+          <x-input
+                  :disabled="isDetails"
+                  type="input"
+                  v-model="taskManagerMemory"
+                  :placeholder="$t('Please enter the Executor memory')"
+                  style="width: 186px;"
+                  autocomplete="off">
+        </x-input>
+        </span>
+      </div>
+
+    </div>
+    <m-list-box>
+      <div slot="text">{{$t('Command-line parameters')}}</div>
+      <div slot="content">
+        <x-input
+                :autosize="{minRows:2}"
+                :disabled="isDetails"
+                type="textarea"
+                v-model="mainArgs"
+                :placeholder="$t('Please enter Command-line parameters')"
+                autocomplete="off">
+        </x-input>
+      </div>
+    </m-list-box>
+    <m-list-box>
+      <div slot="text">{{$t('Other parameters')}}</div>
+      <div slot="content">
+        <x-input
+                :disabled="isDetails"
+                :autosize="{minRows:2}"
+                type="textarea"
+                v-model="others"
+                :placeholder="$t('Please enter other parameters')">
+        </x-input>
+      </div>
+    </m-list-box>
+    <m-list-box>
+      <div slot="text">{{$t('Resources')}}</div>
+      <div slot="content">
+        <m-resources
+                ref="refResources"
+                @on-resourcesData="_onResourcesData"
+                :resource-list="resourceList">
+        </m-resources>
+      </div>
+    </m-list-box>
+    <m-list-box>
+      <div slot="text">{{$t('Custom Parameters')}}</div>
+      <div slot="content">
+        <m-local-params
+                ref="refLocalParams"
+                @on-local-params="_onLocalParams"
+                :udp-list="localParams"
+                :hide="false">
+        </m-local-params>
+      </div>
+    </m-list-box>
+  </div>
+</template>
+<script>
+  import _ from 'lodash'
+  import i18n from '@/module/i18n'
+  import mLocalParams from './_source/localParams'
+  import mListBox from './_source/listBox'
+  import mResources from './_source/resources'
+  import disabledState from '@/module/mixin/disabledState'
+
+  export default {
+    name: 'flink',
+    data () {
+      return {
+        // Main function class
+        mainClass: '',
+        // Master jar package
+        mainJar: null,
+        // Master jar package(List)
+        mainJarList: [],
+        // Deployment method
+        deployMode: 'cluster',
+        // Resource(list)
+        resourceList: [],
+        // Custom function
+        localParams: [],
+        // Driver Number of cores
+        slot: 1,
+        // Driver Number of memory
+        taskManager: '2',
+        // Executor Number
+        jobManagerMemory: '1G',
+        // Executor Number of memory
+        taskManagerMemory: '2G',
+        // Executor Number of cores
+        executorCores: 2,
+        // Command line argument
+        mainArgs: '',
+        // Other parameters
+        others: '',
+        // Program type
+        programType: 'SCALA',
+        // Program type(List)
+        programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }]
+      }
+    },
+    props: {
+      backfillItem: Object
+    },
+    mixins: [disabledState],
+    methods: {
+      /**
+       * return localParams
+       */
+      _onLocalParams (a) {
+        this.localParams = a
+      },
+      /**
+       * return resourceList
+       */
+      _onResourcesData (a) {
+        this.resourceList = a
+      },
+      /**
+       * verification
+       */
+      _verification () {
+        if (this.programType !== 'PYTHON' && !this.mainClass) {
+          this.$message.warning(`${i18n.$t('Please enter main class')}`)
+          return false
+        }
+
+
+        if (!this.mainJar) {
+          this.$message.warning(`${i18n.$t('Please enter main jar package')}`)
+          return false
+        }
+
+        if (!this.jobManagerMemory) {
+          this.$message.warning(`${i18n.$t('Please enter the number of Executor')}`)
+          return false
+        }
+
+        if (!Number.isInteger(parseInt(this.jobManagerMemory))) {
+          this.$message.warning(`${i18n.$t('The number of Executors should be a positive integer')}`)
+          return false
+        }
+
+        if (!this.taskManagerMemory) {
+          this.$message.warning(`${i18n.$t('Please enter the Executor memory')}`)
+          return false
+        }
+
+        if (!this.taskManagerMemory) {
+          this.$message.warning(`${i18n.$t('Please enter the Executor memory')}`)
+          return false
+        }
+
+        if (!_.isNumber(parseInt(this.taskManagerMemory))) {
+          this.$message.warning(`${i18n.$t('Memory should be a positive integer')}`)
+          return false
+        }
+
+        if (!this.executorCores) {
+          this.$message.warning(`${i18n.$t('Please enter ExecutorPlease enter Executor core number')}`)
+          return false
+        }
+
+        if (!Number.isInteger(parseInt(this.executorCores))) {
+          this.$message.warning(`${i18n.$t('Core number should be positive integer')}`)
+          return false
+        }
+
+        if (!this.$refs.refResources._verifResources()) {
+          return false
+        }
+
+        // localParams Subcomponent verification
+        if (!this.$refs.refLocalParams._verifProp()) {
+          return false
+        }
+
+        // storage
+        this.$emit('on-params', {
+          mainClass: this.mainClass,
+          mainJar: {
+            res: this.mainJar
+          },
+          deployMode: this.deployMode,
+          resourceList: this.resourceList,
+          localParams: this.localParams,
+          slot: this.slot,
+          taskManager: this.taskManager,
+          jobManagerMemory: this.jobManagerMemory,
+          taskManagerMemory: this.taskManagerMemory,
+          executorCores: this.executorCores,
+          mainArgs: this.mainArgs,
+          others: this.others,
+          programType: this.programType
+        })
+        return true
+      },
+      /**
+       * get resources list
+       */
+      _getResourcesList () {
+        return new Promise((resolve, reject) => {
+          let isJar = (alias) => {
+            return alias.substring(alias.lastIndexOf('.') + 1, alias.length) !== 'jar'
+          }
+          this.mainJarList = _.map(_.cloneDeep(this.store.state.dag.resourcesListS), v => {
+            return {
+              id: v.id,
+              code: v.alias,
+              disabled: isJar(v.alias)
+            }
+          })
+          resolve()
+        })
+      }
+    },
+    watch: {
+      // Listening type
+      programType (type) {
+        if (type === 'PYTHON') {
+          this.mainClass = ''
+        }
+      }
+    },
+    created () {
+      this._getResourcesList().then(() => {
+        let o = this.backfillItem
+
+        // Non-null objects represent backfill
+        if (!_.isEmpty(o)) {
+          this.mainClass = o.params.mainClass || ''
+          this.mainJar = o.params.mainJar.res || ''
+          this.deployMode = o.params.deployMode || ''
+          this.slot = o.params.slot || 1
+          this.taskManager = o.params.taskManager || '2'
+          this.jobManagerMemory = o.params.jobManagerMemory || '1G'
+          this.taskManagerMemory = o.params.taskManagerMemory || '2G'
+
+          this.mainArgs = o.params.mainArgs || ''
+          this.others = o.params.others
+          this.programType = o.params.programType || 'SCALA'
+
+          // backfill resourceList
+          let resourceList = o.params.resourceList || []
+          if (resourceList.length) {
+            this.resourceList = resourceList
+          }
+
+          // backfill localParams
+          let localParams = o.params.localParams || []
+          if (localParams.length) {
+            this.localParams = localParams
+          }
+        }
+      })
+    },
+    mounted () {
+
+    },
+    components: { mLocalParams, mListBox, mResources }
+  }
+</script>
+
+<style lang="scss" rel="stylesheet/scss">
+  .flink-model {
+    .list-box-4p {
+      .list {
+        margin-bottom: 14px;
+        .sp1 {
+          float: left;
+          width: 112px;
+          text-align: right;
+          margin-right: 10px;
+          font-size: 14px;
+          color: #777;
+          display: inline-block;
+          padding-top: 6px;
+        }
+        .sp2 {
+          float: left;
+          margin-right: 4px;
+        }
+        .sp3 {
+          width: 176px;
+        }
+      }
+    }
+  }
+</style>

File diff suppressed because it is too large
+ 211 - 0
escheduler-ui/src/js/conf/home/pages/dag/img/toobar_flink.svg


+ 5 - 1
escheduler-ui/src/js/module/i18n/locale/zh_CN.js

@@ -476,5 +476,9 @@ export default {
   'warning of timeout': '超时告警',
   'Next five execution times': '接下来五次执行时间',
   'Execute time': '执行时间',
-  'Complement range': '补数范围'
+  'Complement range': '补数范围',
+  'slot':'slot数量',
+  'taskManager':'taskManage数量',
+  'jobManagerMemory':'jobManager内存数',
+  'taskManagerMemory':'taskManager内存数'
 }

BIN
escheduler-ui/src/view/docs/zh_CN/_book/images/flink_edit.png


+ 2 - 2
script/env/.escheduler_env.sh

@@ -5,5 +5,5 @@ export SPARK_HOME2=/opt/soft/spark2
 export PYTHON_HOME=/opt/soft/python
 export JAVA_HOME=/opt/soft/java
 export HIVE_HOME=/opt/soft/hive
-	
-export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH
+export FLINK_HOME=/opt/soft/flink
+export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$PATH