Selaa lähdekoodia

refactor-worker merge to dev bug fix

qiaozhanwei 5 vuotta sitten
vanhempi
commit
2ec668df9b

+ 14 - 6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@@ -18,10 +18,7 @@
 package org.apache.dolphinscheduler.server.builder;
 
 import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.*;
 
 /**
  *  TaskExecutionContext builder
@@ -111,14 +108,25 @@ public class TaskExecutionContextBuilder {
     /**
      * build procedureTask related info
      *
-     * @param procedureTaskExecutionContext
-     * @return
+     * @param procedureTaskExecutionContext procedureTaskExecutionContext
+     * @return TaskExecutionContextBuilder
      */
     public TaskExecutionContextBuilder buildProcedureTaskRelatedInfo(ProcedureTaskExecutionContext procedureTaskExecutionContext){
         taskExecutionContext.setProcedureTaskExecutionContext(procedureTaskExecutionContext);
         return this;
     }
 
+    /**
+     * build sqoopTask related info
+     *
+     * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
+     * @return TaskExecutionContextBuilder
+     */
+    public TaskExecutionContextBuilder buildSqoopTaskRelatedInfo(SqoopTaskExecutionContext sqoopTaskExecutionContext){
+        taskExecutionContext.setSqoopTaskExecutionContext(sqoopTaskExecutionContext);
+        return this;
+    }
+
 
     /**
      * create

+ 27 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@@ -159,6 +159,7 @@ public class TaskPriorityQueueConsumer extends Thread{
         SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
         DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
         ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext();
+        SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
 
 
         // SQL task
@@ -178,6 +179,10 @@ public class TaskPriorityQueueConsumer extends Thread{
             setProcedureTaskRelation(procedureTaskExecutionContext, taskNode);
         }
 
+        if (taskType == TaskType.SQOOP){
+            setSqoopTaskRelation(sqoopTaskExecutionContext,taskNode);
+        }
+
 
         return TaskExecutionContextBuilder.get()
                 .buildTaskInstanceRelatedInfo(taskInstance)
@@ -222,6 +227,28 @@ public class TaskPriorityQueueConsumer extends Thread{
         dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
     }
 
+
+    /**
+     * set datax task relation
+     * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
+     * @param taskNode taskNode
+     */
+    private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) {
+        DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
+
+        DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
+        DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
+
+
+        sqoopTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
+        sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
+        sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+
+        sqoopTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
+        sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
+        sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+    }
+
     /**
      * set SQL task relation
      * @param sqlTaskExecutionContext sqlTaskExecutionContext