Ver Fonte

1,add UT in pom 2,refactor TaskUpdateQueue (#2326)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* Encapsulate the parameters required by sqltask

* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization

* AbstractTask modify

* ProcedureTask optimization

* MasterSchedulerService modify

* TaskUpdateQueueConsumer modify

* test

* DataxTask process run debug

* DataxTask process run debug

* add protobuf dependency,MR、Spark task etc need this

* TaskUpdateQueueConsumer modify

* TaskExecutionContextBuilder set TaskInstance workgroup

* WorkerGroupService queryAllGroup modify
query available work group

* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify

* master and worker register ip  use OSUtils.getHost()

* ProcessInstance host set ip:port format

* worker fault tolerance modify

* Constants and .env modify

* master fault tolerant bug modify

* UT add pom.xml

* timing online  modify

* when taskResponse is faster than taskAck to db,task state will error
add async queue and new a thread reslove this problem

* TaskExecutionContext set host

* 1,TaskManager refactor
2, api start load server dolphinschedule-daemon.sh modify

* 1,TaskManager refactor
2, api start load server dolphinschedule-daemon.sh modify

* add UT in pom.xml

* revert dolphinscheduler-daemon.sh

* ZookeeperRegister use common.properties zookeeperRoot path

* api start exclude org.apache.dolphinscheduler.server.*

* ZookeeperRegister use common.properties zookeeperRoot path

* 1,api start load server filter
2,SHELL task exitStatusCode modify

* java doc error modify

* java doc error modify

* remove todo

* add UT in pom

* 1,add UT in pom
2,refactor TaskUpdateQueue

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
qiaozhanwei há 5 anos atrás
pai
commit
68cb81fdf5

+ 0 - 1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 import io.swagger.annotations.*;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;

+ 0 - 2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java

@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -46,7 +45,6 @@ import java.util.List;
 import java.util.Map;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({TaskQueueFactory.class})
 public class DataAnalysisServiceTest {
     
     @InjectMocks

+ 67 - 40
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java

@@ -40,7 +40,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -53,18 +53,18 @@ import java.util.List;
  * TaskUpdateQueue consumer
  */
 @Component
-public class TaskUpdateQueueConsumer extends Thread{
+public class TaskPriorityQueueConsumer extends Thread{
 
     /**
      * logger of TaskUpdateQueueConsumer
      */
-    private static final Logger logger = LoggerFactory.getLogger(TaskUpdateQueueConsumer.class);
+    private static final Logger logger = LoggerFactory.getLogger(TaskPriorityQueueConsumer.class);
 
     /**
      * taskUpdateQueue
      */
     @Autowired
-    private TaskUpdateQueue taskUpdateQueue;
+    private TaskPriorityQueue taskUpdateQueue;
 
     /**
      * processService
@@ -155,52 +155,19 @@ public class TaskUpdateQueueConsumer extends Thread{
         TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
         // SQL task
         if (taskType == TaskType.SQL){
-            SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class);
-            int datasourceId = sqlParameters.getDatasource();
-            DataSource datasource = processService.findDataSourceById(datasourceId);
-            sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
-
-            // whether udf type
-            boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
-                    && StringUtils.isNotEmpty(sqlParameters.getUdfs());
-
-            if (udfTypeFlag){
-                String[] udfFunIds = sqlParameters.getUdfs().split(",");
-                int[] udfFunIdsArray = new int[udfFunIds.length];
-                for(int i = 0 ; i < udfFunIds.length;i++){
-                    udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]);
-                }
-
-                List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray);
-                sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
-            }
+            setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
 
         }
 
         // DATAX task
         if (taskType == TaskType.DATAX){
-            DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
-
-            DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
-            DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
-
-
-            dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
-            dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
-            dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
-
-            dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
-            dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
-            dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+            setDataxTaskRelation(dataxTaskExecutionContext, taskNode);
         }
 
 
         // procedure task
         if (taskType == TaskType.PROCEDURE){
-            ProcedureParameters procedureParameters = JSONObject.parseObject(taskNode.getParams(), ProcedureParameters.class);
-            int datasourceId = procedureParameters.getDatasource();
-            DataSource datasource = processService.findDataSourceById(datasourceId);
-            procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+            setProcedureTaskRelation(procedureTaskExecutionContext, taskNode);
         }
 
 
@@ -215,6 +182,66 @@ public class TaskUpdateQueueConsumer extends Thread{
                 .create();
     }
 
+    /**
+     * set procedure task relation
+     * @param procedureTaskExecutionContext procedureTaskExecutionContext
+     * @param taskNode taskNode
+     */
+    private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskNode taskNode) {
+        ProcedureParameters procedureParameters = JSONObject.parseObject(taskNode.getParams(), ProcedureParameters.class);
+        int datasourceId = procedureParameters.getDatasource();
+        DataSource datasource = processService.findDataSourceById(datasourceId);
+        procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+    }
+
+    /**
+     * set datax task relation
+     * @param dataxTaskExecutionContext dataxTaskExecutionContext
+     * @param taskNode taskNode
+     */
+    private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
+        DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
+
+        DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
+        DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
+
+
+        dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
+        dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
+        dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+
+        dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
+        dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
+        dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+    }
+
+    /**
+     * set SQL task relation
+     * @param sqlTaskExecutionContext sqlTaskExecutionContext
+     * @param taskNode taskNode
+     */
+    private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskNode taskNode) {
+        SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class);
+        int datasourceId = sqlParameters.getDatasource();
+        DataSource datasource = processService.findDataSourceById(datasourceId);
+        sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+
+        // whether udf type
+        boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
+                && StringUtils.isNotEmpty(sqlParameters.getUdfs());
+
+        if (udfTypeFlag){
+            String[] udfFunIds = sqlParameters.getUdfs().split(",");
+            int[] udfFunIdsArray = new int[udfFunIds.length];
+            for(int i = 0 ; i < udfFunIds.length;i++){
+                udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]);
+            }
+
+            List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray);
+            sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
+        }
+    }
+
     /**
      * get execute local path
      *

+ 4 - 4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import static org.apache.dolphinscheduler.common.Constants.*;
@@ -76,7 +76,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
     /**
      * taskUpdateQueue
      */
-    private TaskUpdateQueue taskUpdateQueue;
+    private TaskPriorityQueue taskUpdateQueue;
     /**
      * constructor of MasterBaseTaskExecThread
      * @param taskInstance      task instance
@@ -89,7 +89,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
         this.cancel = false;
         this.taskInstance = taskInstance;
         this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-        this.taskUpdateQueue = SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
+        this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
     }
 
     /**

+ 1 - 1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java

@@ -17,7 +17,7 @@
 package org.apache.dolphinscheduler.service.queue;
 
 
-public interface TaskUpdateQueue {
+public interface TaskPriorityQueue {
 
     /**
      * put task info

+ 1 - 3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java

@@ -17,8 +17,6 @@
 package org.apache.dolphinscheduler.service.queue;
 
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import java.util.*;
@@ -31,7 +29,7 @@ import static org.apache.dolphinscheduler.common.Constants.*;
  * tasks queue implementation
  */
 @Service
-public class TaskUpdateQueueImpl implements TaskUpdateQueue {
+public class TaskPriorityQueueImpl implements TaskPriorityQueue {
     /**
      * queue size
      */

+ 0 - 55
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java

@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.queue;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * task queue factory
- */
-public class TaskQueueFactory {
-
-  private static final Logger logger = LoggerFactory.getLogger(TaskQueueFactory.class);
-
-
-  private TaskQueueFactory(){
-
-  }
-
-
-  /**
-   * get instance (singleton)
-   *
-   * @return instance
-   */
-  public static TaskUpdateQueue getTaskQueueInstance() {
-    String queueImplValue = CommonUtils.getQueueImplValue();
-    if (StringUtils.isNotBlank(queueImplValue)) {
-        logger.info("task queue impl use zookeeper ");
-        return SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
-    }else{
-      logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
-      System.exit(-1);
-    }
-
-    return null;
-  }
-}

+ 3 - 3
dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java

@@ -17,8 +17,8 @@
 
 package queue;
 
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -45,7 +45,7 @@ public class TaskUpdateQueueTest {
         String taskInfo3 = "1_1_0_3_default";
         String taskInfo4 = "1_1_0_4_default";
 
-        TaskUpdateQueue queue = new TaskUpdateQueueImpl();
+        TaskPriorityQueue queue = new TaskPriorityQueueImpl();
         queue.put(taskInfo1);
         queue.put(taskInfo2);
         queue.put(taskInfo3);

+ 9 - 14
pom.xml

@@ -717,23 +717,18 @@
                         <include>**/dao/mapper/AccessTokenMapperTest.java</include>
                         <include>**/dao/mapper/AlertGroupMapperTest.java</include>
                         <include>**/dao/mapper/AlertMapperTest.java</include>
-                        <include>**/dao/mapper/CommandMapperTest.java</include>
-                        <include>**/dao/cron/CronUtilsTest.java</include>
-                        <include>**/dao/utils/DagHelperTest.java</include>
-                        <include>**/server/worker/task/datax/DataxTaskTest.java</include>
-                        <include>**/server/utils/DataxUtilsTest.java</include>
-                        <include>**/server/utils/SparkArgsUtilsTest.java</include>
-                        <include>**/server/utils/FlinkArgsUtilsTest.java</include>
-                        <include>**/server/utils/ParamUtilsTest.java</include>
+                        <include>**/dao/mapper/DataSourceMapperTest.java</include>
                         <include>**/server/log/MasterLogFilterTest.java</include>
                         <include>**/server/log/SensitiveDataConverterTest.java</include>
                         <include>**/server/log/TaskLogDiscriminatorTest.java</include>
                         <include>**/server/log/TaskLogFilterTest.java</include>
                         <include>**/server/log/WorkerLogFilterTest.java</include>
-                        <include>**/server/master/executor/NettyExecutorManagerTest.java</include>
-                        <include>**/server/master/host/LowerWeightRoundRobinTest.java</include>
-                        <include>**/server/master/host/RandomSelectorTest.java</include>
-                        <include>**/server/master/host/RoundRobinSelectorTest.java</include>
+                        <include>**/server/master/dispatch/executor/NettyExecutorManagerTest.java</include>
+                        <include>**/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java</include>
+                        <include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>
+                        <include>**/server/master/dispatch/host/assign/RoundRobinSelectorTest.java</include>
+                        <include>**/server/master/dispatch/host/RoundRobinHostManagerTest.java</include>
+                        <include>**/server/master/dispatch/ExecutorDispatcherTest.java</include>
                         <include>**/server/master/register/MasterRegistryTest.java</include>
                         <include>**/server/master/AlertManagerTest.java</include>
                         <include>**/server/master/MasterCommandTest.java</include>
@@ -741,19 +736,19 @@
                         <include>**/server/master/ParamsTest.java</include>
                         <include>**/server/register/ZookeeperNodeManagerTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
+                        <include>**/server/utils/ExecutionContextTestUtils.java</include>
                         <include>**/server/utils/FlinkArgsUtilsTest.java</include>
                         <include>**/server/utils/ParamUtilsTest.java</include>
                         <include>**/server/utils/ProcessUtilsTest.java</include>
                         <include>**/server/utils/SparkArgsUtilsTest.java</include>
                         <include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
-                        <include>**/server/worker/register/WorkerRegistryTest.java</include>
+                        <include>**/server/worker/registry/WorkerRegistryTest.java</include>
                         <include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
                         <include>**/server/worker/sql/SqlExecutorTest.java</include>
                         <include>**/server/worker/task/datax/DataxTaskTest.java</include>
                         <include>**/server/worker/task/dependent/DependentTaskTest.java</include>
                         <include>**/server/worker/task/spark/SparkTaskTest.java</include>
                         <include>**/server/worker/task/EnvFileTest.java</include>
-
                     </includes>
                     <!-- <skip>true</skip> -->
                     <argLine>-Xmx2048m</argLine>