Browse Source

[Feature-13052][Task Plugin] Support Apache Seatunnel Connector-V2 (#13086)

Assert 2 years ago
parent
commit
4439c1ec90

+ 5 - 3
docs/docs/en/guide/task/seatunnel.md

@@ -2,7 +2,7 @@
 
 ## Overview
 
-`SeaTunnel` task type for creating and executing `SeaTunnel` tasks. When the worker executes this task, it will parse the config file through the `start-seatunnel-spark.sh` or `start-seatunnel-flink.sh` command.
+`SeaTunnel` task type for creating and executing `SeaTunnel` tasks. When the worker executes this task, it will parse the config file through the `start-seatunnel-spark.sh` , `start-seatunnel-flink.sh` or `seatunnel.sh` command.
 Click [here](https://seatunnel.apache.org/) for more information about `Apache SeaTunnel`.
 
 ## Create Task
@@ -16,13 +16,15 @@ Click [here](https://seatunnel.apache.org/) for more information about `Apache S
 [//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md#default-task-parameters) `Default Task Parameters` section for default parameters.)
 
 - Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
-- Engine: Supports FLINK and SPARK
+- Engine: Supports FLINK, SPARK AND SEATUNNEL_ENGINE
 - FLINK
 - Run model: supports `run` and `run-application` modes
 - Option parameters: used to add the parameters of the Flink engine, such as `-m yarn-cluster -ynm seatunnel`
 - SPARK
-- Deployment mode: specify the deployment mode, `cluster` `client` `local`
+- Deployment mode: specify the deployment mode, `cluster` `client`
 - Master: Specify the `Master` model, `yarn` `local` `spark` `mesos`, where `spark` and `mesos` need to specify the `Master` service address, for example: 127.0.0.1:7077
+- SEATUNNEL_ENGINE
+- Deployment mode: specify the deployment mode, `cluster` `local`
 
           > Click [here](https://seatunnel.apache.org/docs/2.1.2/command/usage) for more information on the usage of `Apache SeaTunnel command`
 

+ 5 - 3
docs/docs/zh/guide/task/seatunnel.md

@@ -2,7 +2,7 @@
 
 ## 综述
 
-`SeaTunnel` 任务类型,用于创建并执行 `SeaTunnel` 类型任务。worker 执行该任务的时候,会通过 `start-seatunnel-spark.sh` 或 `start-seatunnel-flink.sh` 命令解析 config 文件。
+`SeaTunnel` 任务类型,用于创建并执行 `SeaTunnel` 类型任务。worker 执行该任务的时候,会通过 `start-seatunnel-spark.sh` 、 `start-seatunnel-flink.sh` 和 `seatunnel.sh` 命令解析 config 文件。
 点击 [这里](https://seatunnel.apache.org/) 获取更多关于 `Apache SeaTunnel` 的信息。
 
 ## 创建任务
@@ -16,13 +16,15 @@
 [//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。)
 
 - 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
-- 引擎:支持 FLINK 和 SPARK
+- 引擎:支持 FLINK 、 SPARK 和 SEATUNNEL_ENGINE
 - FLINK
 - 运行模型:支持 `run` 和 `run-application` 两种模式
 - 选项参数:用于添加 Flink 引擎本身参数,例如 `-m yarn-cluster -ynm seatunnel`
 - SPARK
-- 部署方式:指定部署模式,`cluster` `client` `local`
+- 部署方式:指定部署模式,`cluster` `client`
 - Master:指定 `Master` 模型,`yarn` `local` `spark` `mesos`,其中 `spark` 和 `mesos` 需要指定 `Master` 服务地址,例如:127.0.0.1:7077
+- SEATUNNEL_ENGINE
+- 部署方式:指定部署模式,`cluster` `local`
 
           > 点击 [这里](https://seatunnel.apache.org/docs/2.1.2/command/usage) 获取更多关于`Apache SeaTunnel command` 使用的信息
 

+ 1 - 1
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/DeployModeEnum.java

@@ -21,7 +21,7 @@ public enum DeployModeEnum {
 
     cluster("cluster"),
     client("client"),
-    local("client");
+    local("local");
 
     private String command;
 

+ 7 - 1
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/EngineEnum.java

@@ -20,7 +20,13 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel;
 public enum EngineEnum {
 
     FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"),
-    SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh");
+    SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh"),
+
+    FLINK_V2("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-connector-v2.sh"),
+
+    SPARK_V2("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-connector-v2.sh"),
+
+    SEATUNNEL_ENGINE("${SEATUNNEL_HOME}/bin/seatunnel.sh");
 
     private String command;
 

+ 10 - 4
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java

@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
 import org.apache.dolphinscheduler.plugin.task.seatunnel.flink.SeatunnelFlinkTask;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.self.SeatunnelEngineTask;
 import org.apache.dolphinscheduler.plugin.task.seatunnel.spark.SeatunnelSparkTask;
 
 public class SeatunnelTaskChannel implements TaskChannel {
@@ -37,10 +38,15 @@ public class SeatunnelTaskChannel implements TaskChannel {
     public SeatunnelTask createTask(TaskExecutionContext taskRequest) {
         SeatunnelParameters seatunnelParameters =
                 JSONUtils.parseObject(taskRequest.getTaskParams(), SeatunnelParameters.class);
-        if (EngineEnum.FLINK == seatunnelParameters.getEngine()) {
-            return new SeatunnelFlinkTask(taskRequest);
-        } else if (EngineEnum.SPARK == seatunnelParameters.getEngine()) {
-            return new SeatunnelSparkTask(taskRequest);
+        switch (seatunnelParameters.getEngine()) {
+            case FLINK:
+            case FLINK_V2:
+                return new SeatunnelFlinkTask(taskRequest);
+            case SPARK:
+            case SPARK_V2:
+                return new SeatunnelSparkTask(taskRequest);
+            case SEATUNNEL_ENGINE:
+                return new SeatunnelEngineTask(taskRequest);
         }
         throw new IllegalArgumentException("Unsupported engine type:" + seatunnelParameters.getEngine());
     }

+ 44 - 0
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineParameters.java

@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.seatunnel.self;
+
+import org.apache.dolphinscheduler.plugin.task.seatunnel.DeployModeEnum;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelParameters;
+
+public class SeatunnelEngineParameters extends SeatunnelParameters {
+
+    private DeployModeEnum deployMode;
+
+    private String others;
+
+    public DeployModeEnum getDeployMode() {
+        return deployMode;
+    }
+
+    public void setDeployMode(DeployModeEnum deployMode) {
+        this.deployMode = deployMode;
+    }
+
+    public String getOthers() {
+        return others;
+    }
+
+    public void setOthers(String others) {
+        this.others = others;
+    }
+}

+ 58 - 0
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java

@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.seatunnel.self;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.Constants;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelTask;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Objects;
+
+public class SeatunnelEngineTask extends SeatunnelTask {
+
+    private SeatunnelEngineParameters seatunnelParameters;
+    public SeatunnelEngineTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+    }
+
+    @Override
+    public void init() {
+        seatunnelParameters =
+                JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SeatunnelEngineParameters.class);
+        setSeatunnelParameters(seatunnelParameters);
+        super.init();
+    }
+
+    @Override
+    public List<String> buildOptions() throws Exception {
+        List<String> args = super.buildOptions();
+        if (!Objects.isNull(seatunnelParameters.getDeployMode())) {
+            args.add(Constants.DEPLOY_MODE_OPTIONS);
+            args.add(seatunnelParameters.getDeployMode().getCommand());
+        }
+        if (StringUtils.isNotBlank(seatunnelParameters.getOthers())) {
+            args.add(seatunnelParameters.getOthers());
+        }
+        return args;
+    }
+
+}

+ 4 - 1
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-deploy-mode.ts

@@ -21,7 +21,8 @@ import type { IJsonItem, IOption } from '../types'
 export function useDeployMode(
   span: number | Ref<number> = 24,
   showClient = ref(true),
-  showCluster = ref(true)
+  showCluster = ref(true),
+  showLocal = ref(true)
 ): IJsonItem {
   const { t } = useI18n()
 
@@ -34,6 +35,8 @@ export function useDeployMode(
           return showCluster.value
         case 'client':
           return showClient.value
+        case 'local':
+          return showLocal.value
         default:
           return true
       }

+ 22 - 7
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts

@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import { computed } from 'vue'
+import { computed, ref } from 'vue'
 import { useI18n } from 'vue-i18n'
 import { useDeployMode, useResources, useCustomParams } from '.'
 import type { IJsonItem } from '../types'
@@ -24,18 +24,21 @@ export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] {
 
   const configEditorSpan = computed(() => (model.useCustom ? 24 : 0))
   const resourceEditorSpan = computed(() => (model.useCustom ? 0 : 24))
-  const flinkSpan = computed(() => (model.engine === 'FLINK' ? 24 : 0))
-  const deployModeSpan = computed(() => (model.engine === 'SPARK' ? 24 : 0))
+  const flinkSpan = computed(() => (model.engine === 'FLINK' || model.engine === 'FLINK_V2' ? 24 : 0))
+  const deployModeSpan = computed(() => (model.engine === 'SPARK' || model.engine === 'SPARK_V2' || model.engine === "SEATUNNEL_ENGINE" ? 24 : 0))
   const masterSpan = computed(() =>
-    model.engine === 'SPARK' && model.deployMode !== 'local' ? 12 : 0
+      (model.engine === 'SPARK' || model.engine === 'SPARK_V2') && model.deployMode !== 'local' ? 12 : 0
   )
   const masterUrlSpan = computed(() =>
-    model.engine === 'SPARK' &&
+    (model.engine === 'SPARK' || model.engine === 'SPARK_V2') &&
     model.deployMode !== 'local' &&
     (model.master === 'SPARK' || model.master === 'MESOS')
       ? 12
       : 0
   )
+  const showClient = computed(() => model.engine === 'SPARK' || model.engine === 'SPARK_V2')
+  const showLocal = computed(() => model.engine === 'SEATUNNEL_ENGINE')
+  const othersSpan = computed(() => (model.engine === 'FLINK' || model.engine === 'FLINK_V2' || model.engine === 'SEATUNNEL_ENGINE' ? 24 : 0))
 
   return [
     {
@@ -64,7 +67,7 @@ export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] {
       type: 'input',
       field: 'others',
       name: t('project.node.option_parameters'),
-      span: flinkSpan,
+      span: othersSpan,
       props: {
         type: 'textarea',
         placeholder: t('project.node.option_parameters_tips')
@@ -72,7 +75,7 @@ export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] {
     },
 
     // SeaTunnel spark parameter
-    useDeployMode(deployModeSpan),
+    useDeployMode(deployModeSpan, showClient, ref(true), showLocal),
     {
       type: 'select',
       field: 'master',
@@ -135,6 +138,18 @@ export const ENGINE = [
   {
     label: 'FLINK',
     value: 'FLINK'
+  },
+  {
+    label: 'SPARK_V2',
+    value: 'SPARK_V2'
+  },
+  {
+    label: 'FLINK_V2',
+    value: 'FLINK_V2'
+  },
+  {
+    label: 'SEATUNNEL_ENGINE',
+    value: 'SEATUNNEL_ENGINE'
   }
 ]