Browse Source

add TaskResponseServiceTest (#2325)

* let quartz use the same datasource

* move master/worker config from dao.properties to each config
add master/worker registry test

* move mybatis config from application.properties to SpringConnectionFactory

* move mybatis-plus config from application.properties to SpringConnectionFactory

* refactor TaskCallbackService

* add ZookeeperNodeManagerTest

* add NettyExecutorManagerTest

* refactor TaskKillProcessor

* add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest

* add RoundRobinHostManagerTest, ExecutorDispatcherTest

* refactor task response service

* add TaskResponseServiceTest
Tboy 5 years ago
parent
commit
23ba035182

+ 63 - 37
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@@ -25,6 +25,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -37,12 +40,12 @@ public class TaskResponseService {
     /**
      * logger
      */
-    private static final Logger logger = LoggerFactory.getLogger(TaskResponseService.class);
+    private final Logger logger = LoggerFactory.getLogger(TaskResponseService.class);
 
     /**
      * attemptQueue
      */
-    private final BlockingQueue<TaskResponseEvent> attemptQueue = new LinkedBlockingQueue<>(5000);
+    private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue<>(5000);
 
 
     /**
@@ -51,12 +54,29 @@ public class TaskResponseService {
     @Autowired
     private ProcessService processService;
 
+    /**
+     * task response worker
+     */
+    private Thread taskResponseWorker;
+
 
     @PostConstruct
-    public void init(){
-        TaskWorker taskWorker = new TaskWorker();
-        taskWorker.setName("TaskWorkerThread");
-        taskWorker.start();
+    public void start(){
+        this.taskResponseWorker = new TaskResponseWorker();
+        this.taskResponseWorker.setName("TaskResponseWorker");
+        this.taskResponseWorker.start();
+    }
+
+    @PreDestroy
+    public void stop(){
+        this.taskResponseWorker.interrupt();
+        if(!eventQueue.isEmpty()){
+            List<TaskResponseEvent> remainEvents = new ArrayList<>(eventQueue.size());
+            eventQueue.drainTo(remainEvents);
+            for(TaskResponseEvent event : remainEvents){
+                this.persist(event);
+            }
+        }
     }
 
     /**
@@ -66,7 +86,7 @@ public class TaskResponseService {
      */
     public void addResponse(TaskResponseEvent taskResponseEvent){
         try {
-            attemptQueue.put(taskResponseEvent);
+            eventQueue.put(taskResponseEvent);
         } catch (InterruptedException e) {
             logger.error("put task : {} error :{}", taskResponseEvent,e);
         }
@@ -76,7 +96,7 @@ public class TaskResponseService {
     /**
      * task worker thread
      */
-    class TaskWorker extends Thread {
+    class TaskResponseWorker extends Thread {
 
         @Override
         public void run() {
@@ -84,41 +104,47 @@ public class TaskResponseService {
             while (Stopper.isRunning()){
                 try {
                     // if not task , blocking here
-                    TaskResponseEvent taskResponseEvent = attemptQueue.take();
+                    TaskResponseEvent taskResponseEvent = eventQueue.take();
                     persist(taskResponseEvent);
-
-                }catch (Exception e){
+                } catch (InterruptedException e){
+                    break;
+                } catch (Exception e){
                     logger.error("persist task error",e);
                 }
             }
+            logger.info("TaskResponseWorker stopped");
         }
+    }
 
-        /**
-         * persist  taskResponseEvent
-         * @param taskResponseEvent taskResponseEvent
-         */
-        private void persist(TaskResponseEvent taskResponseEvent){
-            TaskResponseEvent.Event event = taskResponseEvent.getEvent();
-
-            switch (event){
-                case ACK:
-                    processService.changeTaskState(taskResponseEvent.getState(),
-                            taskResponseEvent.getStartTime(),
-                            taskResponseEvent.getWorkerAddress(),
-                            taskResponseEvent.getExecutePath(),
-                            taskResponseEvent.getLogPath(),
-                            taskResponseEvent.getTaskInstanceId());
-                    break;
-                case RESULT:
-                    processService.changeTaskState(taskResponseEvent.getState(),
-                            taskResponseEvent.getEndTime(),
-                            taskResponseEvent.getProcessId(),
-                            taskResponseEvent.getAppIds(),
-                            taskResponseEvent.getTaskInstanceId());
-                    break;
-                default:
-                    throw new IllegalArgumentException("invalid event type : " + event);
-            }
+    /**
+     * persist  taskResponseEvent
+     * @param taskResponseEvent taskResponseEvent
+     */
+    private void persist(TaskResponseEvent taskResponseEvent){
+        TaskResponseEvent.Event event = taskResponseEvent.getEvent();
+
+        switch (event){
+            case ACK:
+                processService.changeTaskState(taskResponseEvent.getState(),
+                        taskResponseEvent.getStartTime(),
+                        taskResponseEvent.getWorkerAddress(),
+                        taskResponseEvent.getExecutePath(),
+                        taskResponseEvent.getLogPath(),
+                        taskResponseEvent.getTaskInstanceId());
+                break;
+            case RESULT:
+                processService.changeTaskState(taskResponseEvent.getState(),
+                        taskResponseEvent.getEndTime(),
+                        taskResponseEvent.getProcessId(),
+                        taskResponseEvent.getAppIds(),
+                        taskResponseEvent.getTaskInstanceId());
+                break;
+            default:
+                throw new IllegalArgumentException("invalid event type : " + event);
         }
     }
+
+    public BlockingQueue<TaskResponseEvent> getEventQueue() {
+        return eventQueue;
+    }
 }

+ 66 - 0
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.processor.queue;
+
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.registry.DependencyConfig;
+import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.util.Date;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, TaskResponseService.class, ZookeeperRegistryCenter.class,
+        ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class})
+public class TaskResponseServiceTest {
+
+    @Autowired
+    private TaskResponseService taskResponseService;
+
+    @Test
+    public void testAdd(){
+        TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(),
+                "", "", "", 1);
+        taskResponseService.addResponse(taskResponseEvent);
+        Assert.assertTrue(taskResponseService.getEventQueue().size() == 1);
+        try {
+            Thread.sleep(10);
+        } catch (InterruptedException ignore) {
+        }
+        //after sleep, inner worker will take the event
+        Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);
+    }
+
+    @Test
+    public void testStop(){
+        TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(),
+                "", "", "", 1);
+        taskResponseService.addResponse(taskResponseEvent);
+        taskResponseService.stop();
+        Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);
+    }
+}