Browse Source

UT Coverage rate test (#2276)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* Encapsulate the parameters required by sqltask

* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization

* AbstractTask modify

* ProcedureTask optimization

* MasterSchedulerService modify

* TaskUpdateQueueConsumer modify

* test

* DataxTask process run debug

* DataxTask process run debug

* add protobuf dependency,MR、Spark task etc need this

* TaskUpdateQueueConsumer modify

* TaskExecutionContextBuilder set TaskInstance workgroup

* WorkerGroupService queryAllGroup modify
query available work group

* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify

* master and worker register ip  use OSUtils.getHost()

* ProcessInstance host set ip:port format

* worker fault tolerance modify

* Constants and .env modify

* master fault tolerant bug modify

* UT add pom.xml

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
qiaozhanwei 5 years ago
parent
commit
052f9d10bf

+ 1 - 1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@@ -121,7 +121,7 @@ public final class Constants {
     /**
      * MasterServer directory registered in zookeeper
      */
-    public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/nodes/masters";
+    public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/nodes/master";
 
     /**
      * WorkerServer directory registered in zookeeper

+ 34 - 3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@@ -725,7 +725,7 @@ public class MasterExecThread implements Runnable {
         ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
         ExecutionStatus state = instance.getState();
 
-        if(activeTaskNode.size() > 0){
+        if(activeTaskNode.size() > 0 || haveRetryTaskStandBy()){
             return runningState(state);
         }
         // process failure
@@ -768,6 +768,24 @@ public class MasterExecThread implements Runnable {
         return state;
     }
 
+    /**
+     * whether standby task list have retry tasks
+     * @return
+     */
+    private boolean haveRetryTaskStandBy() {
+
+        boolean result = false;
+
+        for(String taskName : readyToSubmitTaskList.keySet()){
+            TaskInstance task = readyToSubmitTaskList.get(taskName);
+            if(task.getState().typeIsFailure()){
+                result = true;
+                break;
+            }
+        }
+        return result;
+    }
+
     /**
      * whether complement end
      * @return Boolean whether is complement end
@@ -856,7 +874,11 @@ public class MasterExecThread implements Runnable {
         // submit start node
         submitPostNode(null);
         boolean sendTimeWarning = false;
-        while(!processInstance.IsProcessInstanceStop()){
+        while(Stopper.isRunning()){
+
+            if(processInstance.IsProcessInstanceStop()){
+                break;
+            }
 
             // send warning email if process time out.
             if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){
@@ -871,12 +893,21 @@ public class MasterExecThread implements Runnable {
                 if(!future.isDone()){
                     continue;
                 }
+
                 // node monitor thread complete
-                activeTaskNode.remove(entry.getKey());
+                task = this.processService.findTaskInstanceById(task.getId());
+
                 if(task == null){
                     this.taskFailedSubmit = true;
+                    activeTaskNode.remove(entry.getKey());
                     continue;
                 }
+
+                // node monitor thread complete
+                if(task.getState().typeIsFinished()){
+                    activeTaskNode.remove(entry.getKey());
+                }
+
                 logger.info("task :{}, id:{} complete, state is {} ",
                         task.getName(), task.getId(), task.getState().toString());
                 //TODO  node success , post node submit

+ 30 - 6
pom.xml

@@ -712,20 +712,44 @@
                         <include>**/alert/utils/FuncUtilsTest.java</include>
                         <include>**/alert/utils/JSONUtilsTest.java</include>
                         <include>**/alert/utils/PropertyUtilsTest.java</include>
-                        <include>**/server/utils/SparkArgsUtilsTest.java</include>
-                        <include>**/server/utils/FlinkArgsUtilsTest.java</include>
-                        <include>**/server/utils/ParamUtilsTest.java</include>
-                        <include>**/server/master/MasterExecThreadTest.java</include>
+                        <include>**/alert/template/AlertTemplateFactoryTest.java</include>
+                        <include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
                         <include>**/dao/mapper/AccessTokenMapperTest.java</include>
                         <include>**/dao/mapper/AlertGroupMapperTest.java</include>
                         <include>**/dao/mapper/AlertMapperTest.java</include>
                         <include>**/dao/mapper/CommandMapperTest.java</include>
                         <include>**/dao/cron/CronUtilsTest.java</include>
                         <include>**/dao/utils/DagHelperTest.java</include>
-                        <include>**/alert/template/AlertTemplateFactoryTest.java</include>
-                        <include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
                         <include>**/server/worker/task/datax/DataxTaskTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
+                        <include>**/server/utils/SparkArgsUtilsTest.java</include>
+                        <include>**/server/utils/FlinkArgsUtilsTest.java</include>
+                        <include>**/server/utils/ParamUtilsTest.java</include>
+                        <include>**/server/log/MasterLogFilterTest.java</include>
+                        <include>**/server/log/SensitiveDataConverterTest.java</include>
+                        <include>**/server/log/TaskLogDiscriminatorTest.java</include>
+                        <include>**/server/log/TaskLogFilterTest.java</include>
+                        <include>**/server/log/WorkerLogFilterTest.java</include>
+                        <include>**/server/master/executor/NettyExecutorManagerTest.java</include>
+                        <include>**/server/master/host/LowerWeightRoundRobinTest.java</include>
+                        <include>**/server/master/register/MasterRegistryTest.java</include>
+                        <include>**/server/master/AlertManagerTest.java</include>
+                        <include>**/server/master/MasterCommandTest.java</include>
+                        <include>**/server/master/MasterExecThreadTest.java</include>
+                        <include>**/server/master/ParamsTest.java</include>
+                        <include>**/server/register/ZookeeperNodeManagerTest.java</include>
+                        <include>**/server/utils/DataxUtilsTest.java</include>
+                        <include>**/server/utils/FlinkArgsUtilsTest.java</include>
+                        <include>**/server/utils/ParamUtilsTest.java</include>
+                        <include>**/server/utils/ProcessUtilsTest.java</include>
+                        <include>**/server/utils/SparkArgsUtilsTest.java</include>
+                        <include>**/server/worker/register/WorkerRegistryTest.java</include>
+                        <include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
+                        <include>**/server/worker/sql/SqlExecutorTest.java</include>
+                        <include>**/server/worker/task/datax/DataxTaskTest.java</include>
+                        <include>**/server/worker/task/dependent/DependentTaskTest.java</include>
+                        <include>**/server/worker/task/spark/SparkTaskTest.java</include>
+                        <include>**/server/worker/task/EnvFileTest.java</include>
                     </includes>
                     <!-- <skip>true</skip> -->
                     <argLine>-Xmx2048m</argLine>