Browse Source

no valid worker group,master can kill task directly (#2541)

* dispatch task fail will set task status failed

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* send mail error, #2466 bug fix

* send mail error, #2466 bug fix

* send mail error, #2466 bug fix

* send mail error, #2466 bug fix

* #2486 bug fix

* host and workergroup compatible

* EnterpriseWeChatUtils modify

* EnterpriseWeChatUtils modify

* EnterpriseWeChatUtils modify

* #2499 bug fix

* add comment

* revert comment

* revert comment

* #2499 buf fix

* #2499 bug fix

* #2499 bug fix

* #2499 bug fix

* #2499 bug fix

* #2499 bug fix

* #2499 bug fix

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

* no valid worker group,master can kill task directly

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
qiaozhanwei 5 years ago
parent
commit
a12103b08c

+ 14 - 2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@@ -120,19 +120,31 @@ public class TaskPriorityQueueConsumer extends Thread{
         Boolean result = false;
         while (Stopper.isRunning()){
             try {
-                result =  dispatcher.dispatch(executionContext);
+                result = dispatcher.dispatch(executionContext);
             } catch (ExecuteException e) {
                 logger.error("dispatch error",e);
                 ThreadUtils.sleep(SLEEP_TIME_MILLIS);
             }
 
-            if (result){
+            if (result || taskInstanceIsFinalState(taskInstanceId)){
                 break;
             }
         }
         return result;
     }
 
+
+    /**
+     * taskInstance is final state
+     * success,failure,kill,stop,pause,threadwaiting is final state
+     * @param taskInstanceId taskInstanceId
+     * @return taskInstance is final state
+     */
+    public Boolean taskInstanceIsFinalState(int taskInstanceId){
+        TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
+        return taskInstance.getState().typeIsFinished();
+    }
+
     /**
      * get TaskExecutionContext
      * @param taskInstanceId taskInstanceId

+ 45 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
@@ -35,9 +36,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Date;
+import java.util.Set;
 
 
 /**
@@ -53,6 +57,12 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
 
     private NettyExecutorManager nettyExecutorManager;
 
+
+    /**
+     * zookeeper register center
+     */
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
     /**
      * constructor of MasterTaskExecThread
      * @param taskInstance      task instance
@@ -61,6 +71,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
         super(taskInstance);
         this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
         this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
+        this.zookeeperRegistryCenter = SpringApplicationContext.getBean(ZookeeperRegistryCenter.class);
     }
 
     /**
@@ -175,6 +186,16 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
         }
         alreadyKilled = true;
 
+        String taskInstanceWorkerGroup = taskInstance.getWorkerGroup();
+
+        // not exists
+        if (!existsValidWorkerGroup(taskInstanceWorkerGroup)){
+            taskInstance.setState(ExecutionStatus.KILL);
+            taskInstance.setEndTime(new Date());
+            processService.updateTaskInstance(taskInstance);
+            return;
+        }
+
         TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
         killCommand.setTaskInstanceId(taskInstance.getId());
 
@@ -185,10 +206,33 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
 
         nettyExecutorManager.executeDirectly(executionContext);
 
-        logger.info("master add kill task :{} id:{} to kill queue",
+        logger.info("master kill taskInstance name :{} taskInstance id:{}",
                 taskInstance.getName(), taskInstance.getId() );
     }
 
+    /**
+     * whether exists valid worker group
+     * @param taskInstanceWorkerGroup taskInstanceWorkerGroup
+     * @return whether exists
+     */
+    public Boolean existsValidWorkerGroup(String taskInstanceWorkerGroup){
+        Set<String> workerGroups = zookeeperRegistryCenter.getWorkerGroupDirectly();
+        // not worker group
+        if (CollectionUtils.isEmpty(workerGroups)){
+            return false;
+        }
+
+        // has worker group , but not taskInstance assigned worker group
+        if (!workerGroups.contains(taskInstanceWorkerGroup)){
+            return false;
+        }
+        Set<String> workers = zookeeperRegistryCenter.getWorkerGroupNodesDirectly(taskInstanceWorkerGroup);
+        if (CollectionUtils.isEmpty(workers)) {
+            return false;
+        }
+        return true;
+    }
+
     /**
      * get task timeout parameter
      * @return TaskTimeoutParameter

File diff suppressed because it is too large
+ 262 - 0
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java


+ 80 - 0
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java

@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import junit.framework.Assert;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer;
+import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
+import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.registry.DependencyConfig;
+import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class,
+        NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
+        ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class})
+public class MasterTaskExecThreadTest {
+
+    @Test
+    public void testExistsValidWorkerGroup1(){
+        ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class);
+        Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(null);
+        MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null);
+        masterTaskExecThread.existsValidWorkerGroup("default");
+    }
+    @Test
+    public void testExistsValidWorkerGroup2(){
+        ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class);
+        Set<String> workerGorups = new HashSet<>();
+        workerGorups.add("test1");
+        workerGorups.add("test2");
+
+        Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(workerGorups);
+        MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null);
+        masterTaskExecThread.existsValidWorkerGroup("default");
+    }
+
+    @Test
+    public void testExistsValidWorkerGroup3(){
+        ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class);
+        Set<String> workerGorups = new HashSet<>();
+        workerGorups.add("test1");
+
+        Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(workerGorups);
+        Mockito.when(zookeeperRegistryCenter.getWorkerGroupNodesDirectly("test1")).thenReturn(workerGorups);
+        MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null);
+        masterTaskExecThread.existsValidWorkerGroup("test1");
+    }
+
+
+}

+ 8 - 0
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java

@@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.registry;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
 import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 import org.mockito.Mockito;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -144,4 +147,9 @@ public class DependencyConfig {
     public TaskResponseService taskResponseService(){
         return Mockito.mock(TaskResponseService.class);
     }
+
+    @Bean
+    public TaskPriorityQueue taskPriorityQueue(){
+        return new TaskPriorityQueueImpl();
+    }
 }

+ 2 - 0
pom.xml

@@ -778,6 +778,8 @@
                         <include>**/server/log/TaskLogDiscriminatorTest.java</include>
                         <include>**/server/log/TaskLogFilterTest.java</include>
                         <include>**/server/log/WorkerLogFilterTest.java</include>
+                        <include>**/server/master/consumer/TaskPriorityQueueConsumerTest.java</include>
+                        <include>**/server/master/runner/MasterTaskExecThreadTest.java</include>
                         <include>**/server/master/dispatch/executor/NettyExecutorManagerTest.java</include>
                         <include>**/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java</include>
                         <include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>