Browse Source

sqoop task optimization

eights 4 years ago
parent
commit
c8f28ab2ba

+ 1 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@@ -218,7 +218,7 @@ public enum Status {
     DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"),
     DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"),
     PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"),
-    PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"),
+    PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node {0} parameter invalid", "流程节点[{0}]参数无效"),
     PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"),
     DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022,"delete process definition by id error", "删除工作流定义错误"),
     SCHEDULE_CRON_STATE_ONLINE(50023,"the status of schedule {0} is already on line", "调度配置[{0}]已上线"),

+ 41 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java

@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.enums;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+
+public enum  SqoopJobType {
+    CUSTOM(0, "CUSTOM"),
+    TEMPLATE(1, "TEMPLATE");
+
+    SqoopJobType(int code, String descp){
+        this.code = code;
+        this.descp = descp;
+    }
+
+    @EnumValue
+    private final int code;
+    private final String descp;
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getDescp() {
+        return descp;
+    }
+}

+ 92 - 7
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java

@@ -16,6 +16,8 @@
  */
 package org.apache.dolphinscheduler.common.task.sqoop;
 
+import org.apache.dolphinscheduler.common.enums.SqoopJobType;
+import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -28,6 +30,23 @@ import java.util.List;
  */
 public class SqoopParameters  extends AbstractParameters {
 
+    /**
+     * sqoop job type:
+     * CUSTOM - custom sqoop job
+     * TEMPLATE - sqoop template job
+     */
+    private String jobType;
+
+    /**
+     * customJob eq 1, use customShell
+     */
+    private String customShell;
+
+    /**
+     * sqoop job name - map-reduce job name
+     */
+    private String jobName;
+
     /**
      * model type
      */
@@ -53,6 +72,16 @@ public class SqoopParameters  extends AbstractParameters {
      */
     private String targetParams;
 
+    /**
+     * hadoop custom param for sqoop job
+     */
+    private List<Property> hadoopCustomParams;
+
+    /**
+     * sqoop advanced param
+     */
+    private List<Property> sqoopAdvancedParams;
+
     public String getModelType() {
         return modelType;
     }
@@ -101,18 +130,74 @@ public class SqoopParameters  extends AbstractParameters {
         this.targetParams = targetParams;
     }
 
+    public String getJobType() {
+        return jobType;
+    }
+
+    public void setJobType(String jobType) {
+        this.jobType = jobType;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public String getCustomShell() {
+        return customShell;
+    }
+
+    public void setCustomShell(String customShell) {
+        this.customShell = customShell;
+    }
+
+    public List<Property> getHadoopCustomParams() {
+        return hadoopCustomParams;
+    }
+
+    public void setHadoopCustomParams(List<Property> hadoopCustomParams) {
+        this.hadoopCustomParams = hadoopCustomParams;
+    }
+
+    public List<Property> getSqoopAdvancedParams() {
+        return sqoopAdvancedParams;
+    }
+
+    public void setSqoopAdvancedParams(List<Property> sqoopAdvancedParams) {
+        this.sqoopAdvancedParams = sqoopAdvancedParams;
+    }
+
     @Override
     public boolean checkParameters() {
-        return StringUtils.isNotEmpty(modelType)&&
-                concurrency != 0 &&
-                StringUtils.isNotEmpty(sourceType)&&
-                StringUtils.isNotEmpty(targetType)&&
-                StringUtils.isNotEmpty(sourceParams)&&
-                StringUtils.isNotEmpty(targetParams);
+
+        boolean sqoopParamsCheck = false;
+
+        if (StringUtils.isEmpty(jobType)) {
+            return sqoopParamsCheck;
+        }
+
+        if (SqoopJobType.TEMPLATE.getDescp().equals(jobType)) {
+            sqoopParamsCheck = StringUtils.isEmpty(customShell) &&
+                    StringUtils.isNotEmpty(modelType) &&
+                    StringUtils.isNotEmpty(jobName) &&
+                    concurrency != 0 &&
+                    StringUtils.isNotEmpty(sourceType) &&
+                    StringUtils.isNotEmpty(targetType) &&
+                    StringUtils.isNotEmpty(sourceParams) &&
+                    StringUtils.isNotEmpty(targetParams);
+        } else if (SqoopJobType.CUSTOM.getDescp().equals(jobType)) {
+            sqoopParamsCheck = StringUtils.isNotEmpty(customShell) &&
+                    StringUtils.isEmpty(jobName);
+        }
+
+        return sqoopParamsCheck;
     }
 
     @Override
     public List<ResourceInfo> getResourceFilesList() {
-       return new ArrayList<>();
+        return new ArrayList<>();
     }
 }

+ 18 - 14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.consumer;
 
 import com.alibaba.fastjson.JSONObject;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.SqoopJobType;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.UdfType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -258,29 +259,32 @@ public class TaskPriorityQueueConsumer extends Thread{
 
 
     /**
-     * set datax task relation
+     * set sqoop task relation
      * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
      * @param taskNode taskNode
      */
     private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) {
         SqoopParameters sqoopParameters = JSONObject.parseObject(taskNode.getParams(), SqoopParameters.class);
 
-        SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
-        TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
+        // sqoop job type is template set task relation
+        if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
+            SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
+            TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
 
-        DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
-        DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
+            DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
+            DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
 
-        if (dataSource != null){
-            sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
-            sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
-            sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
-        }
+            if (dataSource != null){
+                sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
+                sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
+                sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+            }
 
-        if (dataTarget != null){
-            sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
-            sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
-            sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+            if (dataTarget != null){
+                sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
+                sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
+                sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+            }
         }
     }
 

+ 39 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java

@@ -16,10 +16,17 @@
  */
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
 
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
+
 /**
  * common script generator
  */
@@ -32,6 +39,38 @@ public class CommonGenerator {
         try{
             result.append("sqoop ")
                     .append(sqoopParameters.getModelType());
+
+            //set sqoop job name
+            result.append(" -D mapred.job.name")
+                    .append(Constants.EQUAL_SIGN)
+                    .append(sqoopParameters.getJobName());
+
+            //set hadoop custom param
+            List<Property> hadoopCustomParams = sqoopParameters.getHadoopCustomParams();
+            if (CollectionUtils.isNotEmpty(hadoopCustomParams)) {
+                for (Property hadoopCustomParam : hadoopCustomParams) {
+                    String hadoopCustomParamStr = " -D " + hadoopCustomParam.getProp()
+                            + Constants.EQUAL_SIGN + hadoopCustomParam.getValue();
+
+                    if (StringUtils.isNotEmpty(hadoopCustomParamStr)) {
+                        result.append(hadoopCustomParamStr);
+                    }
+                }
+            }
+
+            //set sqoop advanced custom param
+            List<Property> sqoopAdvancedParams = sqoopParameters.getSqoopAdvancedParams();
+            if (CollectionUtils.isNotEmpty(sqoopAdvancedParams)) {
+
+                for (Property sqoopAdvancedParam : sqoopAdvancedParams) {
+                    String sqoopAdvancedParamStr = " " + sqoopAdvancedParam.getProp()
+                            + " " + sqoopAdvancedParam.getValue();
+                    if (StringUtils.isNotEmpty(sqoopAdvancedParamStr)) {
+                        result.append(sqoopAdvancedParamStr);
+                    }
+                }
+            }
+
             if(sqoopParameters.getConcurrency() >0){
                 result.append(" -m ")
                         .append(sqoopParameters.getConcurrency());

+ 7 - 7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java

@@ -77,19 +77,19 @@ public class MysqlSourceGenerator implements ISourceGenerator {
                         }else{
                             srcQuery += " WHERE $CONDITIONS";
                         }
-                        result.append(" --query \'"+srcQuery+"\'");
+                        result.append(" --query \'").append(srcQuery).append("\'");
 
                     }
 
                     List<Property>  mapColumnHive = sourceMysqlParameter.getMapColumnHive();
 
                     if(mapColumnHive != null && !mapColumnHive.isEmpty()){
-                        String columnMap = "";
+                        StringBuilder columnMap = new StringBuilder();
                         for(Property item:mapColumnHive){
-                            columnMap = item.getProp()+"="+ item.getValue()+",";
+                            columnMap.append(item.getProp()).append("=").append(item.getValue()).append(",");
                         }
 
-                        if(StringUtils.isNotEmpty(columnMap)){
+                        if(StringUtils.isNotEmpty(columnMap.toString())){
                             result.append(" --map-column-hive ")
                                     .append(columnMap.substring(0,columnMap.length()-1));
                         }
@@ -98,12 +98,12 @@ public class MysqlSourceGenerator implements ISourceGenerator {
                     List<Property>  mapColumnJava = sourceMysqlParameter.getMapColumnJava();
 
                     if(mapColumnJava != null && !mapColumnJava.isEmpty()){
-                        String columnMap = "";
+                        StringBuilder columnMap = new StringBuilder();
                         for(Property item:mapColumnJava){
-                            columnMap = item.getProp()+"="+ item.getValue()+",";
+                            columnMap.append(item.getProp()).append("=").append(item.getValue()).append(",");
                         }
 
-                        if(StringUtils.isNotEmpty(columnMap)){
+                        if(StringUtils.isNotEmpty(columnMap.toString())){
                             result.append(" --map-column-java ")
                                     .append(columnMap.substring(0,columnMap.length()-1));
                         }