Browse Source

[Feature-10837][Task Plugin] Sqoop Task Add --split-by Parameter (#10960)

* [Feature-10837][Task Plugin] Sqoop Task Add --split-by Parameter

* checkstyle
Assert 2 years ago
parent
commit
cdfe115247

+ 2 - 0
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopConstants.java

@@ -30,6 +30,8 @@ public final class SqoopConstants {
     public static final String LINES_TERMINATED_BY = "--lines-terminated-by";
     public static final String FIELD_NULL_PLACEHOLDER = "--null-non-string 'NULL' --null-string 'NULL'";
 
+    public static final String SPLIT_BY = "--split-by";
+
     //sqoop db
     public static final String DB_CONNECT = "--connect";
     public static final String DB_USERNAME = "--username";

+ 4 - 0
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/CommonGenerator.java

@@ -79,6 +79,10 @@ public class CommonGenerator {
             if (sqoopParameters.getConcurrency() > 0) {
                 commonSb.append(SPACE).append(SqoopConstants.SQOOP_PARALLELISM)
                     .append(SPACE).append(sqoopParameters.getConcurrency());
+                if (sqoopParameters.getConcurrency() > 1) {
+                    commonSb.append(SPACE).append(SqoopConstants.SPLIT_BY)
+                        .append(SPACE).append(sqoopParameters.getSplitBy());
+                }
             }
         } catch (Exception e) {
             logger.error(String.format("Sqoop task general param build failed: [%s]", e.getMessage()));

+ 21 - 9
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/parameter/SqoopParameters.java

@@ -35,7 +35,7 @@ import java.util.Objects;
 /**
  * sqoop parameters
  */
-public class SqoopParameters  extends AbstractParameters {
+public class SqoopParameters extends AbstractParameters {
 
     /**
      * sqoop job type:
@@ -62,6 +62,10 @@ public class SqoopParameters  extends AbstractParameters {
      * concurrency
      */
     private int concurrency;
+    /**
+     * split by
+     */
+    private String splitBy;
     /**
      * source type
      */
@@ -105,6 +109,14 @@ public class SqoopParameters  extends AbstractParameters {
         this.concurrency = concurrency;
     }
 
+    public String getSplitBy() {
+        return splitBy;
+    }
+
+    public void setSplitBy(String splitBy) {
+        this.splitBy = splitBy;
+    }
+
     public String getSourceType() {
         return sourceType;
     }
@@ -188,16 +200,16 @@ public class SqoopParameters  extends AbstractParameters {
 
         if (SqoopJobType.TEMPLATE.getDescp().equals(jobType)) {
             sqoopParamsCheck = StringUtils.isEmpty(customShell)
-                    && StringUtils.isNotEmpty(modelType)
-                    && StringUtils.isNotEmpty(jobName)
-                    && concurrency != 0
-                    && StringUtils.isNotEmpty(sourceType)
-                    && StringUtils.isNotEmpty(targetType)
-                    && StringUtils.isNotEmpty(sourceParams)
-                    && StringUtils.isNotEmpty(targetParams);
+                && StringUtils.isNotEmpty(modelType)
+                && StringUtils.isNotEmpty(jobName)
+                && concurrency != 0
+                && StringUtils.isNotEmpty(sourceType)
+                && StringUtils.isNotEmpty(targetType)
+                && StringUtils.isNotEmpty(sourceParams)
+                && StringUtils.isNotEmpty(targetParams);
         } else if (SqoopJobType.CUSTOM.getDescp().equals(jobType)) {
             sqoopParamsCheck = StringUtils.isNotEmpty(customShell)
-                    && StringUtils.isEmpty(jobName);
+                && StringUtils.isEmpty(jobName);
         }
 
         return sqoopParamsCheck;

+ 2 - 0
dolphinscheduler-ui/src/locales/en_US/project.ts

@@ -497,6 +497,8 @@ export default {
     allow_insert: 'AllowInsert',
     concurrency: 'Concurrency',
     concurrency_tips: 'Please enter Concurrency',
+    concurrency_column: 'Concurrency Column',
+    concurrency_column_tips: 'Please enter Concurrency Column',
     sea_tunnel_master: 'Master',
     sea_tunnel_master_url: 'Master URL',
     sea_tunnel_queue: 'Queue',

+ 2 - 0
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@@ -491,6 +491,8 @@ export default {
     allow_insert: '无更新便插入',
     concurrency: '并发度',
     concurrency_tips: '请输入并发度',
+    concurrency_column: '并发键',
+    concurrency_column_tips: '请输入并发键',
     sea_tunnel_master: 'Master',
     sea_tunnel_master_url: 'Master URL',
     sea_tunnel_queue: '队列',

+ 12 - 1
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sqoop.ts

@@ -24,6 +24,8 @@ export function useSqoop(model: { [field: string]: any }): IJsonItem[] {
   const customSpan = computed(() => (model.isCustomTask ? 24 : 0))
   const unCustomSpan = computed(() => (model.isCustomTask ? 0 : 24))
 
+  const unCustomHalfSpan = computed(() => (model.isCustomTask ? 0 : 12))
+
   return [
     {
       type: 'switch',
@@ -75,12 +77,21 @@ export function useSqoop(model: { [field: string]: any }): IJsonItem[] {
       type: 'input-number',
       field: 'concurrency',
       name: t('project.node.concurrency'),
-      span: unCustomSpan,
+      span: unCustomHalfSpan,
       props: {
         placeholder: t('project.node.concurrency_tips'),
         min: 1
       }
     },
+    {
+      type: 'input',
+      field: 'splitBy',
+      name: t('project.node.concurrency_column'),
+      span: unCustomHalfSpan,
+      props: {
+        placeholder: t('project.node.concurrency_column_tips')
+      }
+    },
     {
       type: 'editor',
       field: 'customShell',

+ 1 - 0
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@@ -88,6 +88,7 @@ export function formatParams(data: INodeData): {
       taskParams.hadoopCustomParams = data.hadoopCustomParams
       taskParams.sqoopAdvancedParams = data.sqoopAdvancedParams
       taskParams.concurrency = data.concurrency
+      taskParams.splitBy = data.splitBy
       taskParams.modelType = data.modelType
       taskParams.sourceType = data.sourceType
       taskParams.targetType = data.targetType

+ 1 - 0
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts

@@ -65,6 +65,7 @@ export function useSqoop({
     targetHiveDropDelimiter: false,
     targetHiveOverWrite: true,
     concurrency: 1,
+    splitBy: '',
     timeoutNotifyStrategy: ['WARN']
   } as INodeData)
 

+ 1 - 0
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@@ -265,6 +265,7 @@ interface ITaskParams {
   hadoopCustomParams?: ILocalParam[]
   sqoopAdvancedParams?: ILocalParam[]
   concurrency?: number
+  splitBy?: string
   modelType?: ModelType
   sourceType?: SourceType
   targetType?: SourceType