Browse Source

task prioriry refator (#2094)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue
qiaozhanwei 5 years ago
parent
commit
8aaee39aa2
16 changed files with 588 additions and 889 deletions
  1. 2 0
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 14 0
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  3. 146 0
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
  4. 169 0
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
  5. 4 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  6. 46 106
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  7. 4 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
  8. 2 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  9. 2 40
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  10. 0 102
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
  11. 2 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
  12. 0 375
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
  13. 23 27
      dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java
  14. 115 0
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
  15. 0 229
      dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java
  16. 59 0
      dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java

+ 2 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@@ -1004,4 +1004,6 @@ public final class Constants {
      * default worker group
      */
     public static final String DEFAULT_WORKER_GROUP = "default";
+
+    public static final Integer TASK_INFO_LENGTH = 5;
 }

+ 14 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@@ -191,6 +191,11 @@ public class TaskInstance implements Serializable {
      */
     private int workerGroupId;
 
+    /**
+     * workerGroup
+     */
+    private String workerGroup;
+
     public ProcessInstance getProcessInstance() {
         return processInstance;
     }
@@ -460,6 +465,14 @@ public class TaskInstance implements Serializable {
         this.dependentResult = dependentResult;
     }
 
+    public String getWorkerGroup() {
+        return workerGroup;
+    }
+
+    public void setWorkerGroup(String workerGroup) {
+        this.workerGroup = workerGroup;
+    }
+
     @Override
     public String toString() {
         return "TaskInstance{" +
@@ -492,6 +505,7 @@ public class TaskInstance implements Serializable {
                 ", processInstancePriority=" + processInstancePriority +
                 ", dependentResult='" + dependentResult + '\'' +
                 ", workerGroupId=" + workerGroupId +
+                ", workerGroup='" + workerGroup + '\'' +
                 '}';
     }
 }

+ 146 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java

@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.entity;
+
+import static org.apache.dolphinscheduler.common.Constants.*;
+
+/**
+ *  task priority info
+ */
+public class TaskPriority {
+
+    /**
+     * processInstancePriority
+     */
+    private int processInstancePriority;
+
+    /**
+     * processInstanceId
+     */
+    private int processInstanceId;
+
+    /**
+     * taskInstancePriority
+     */
+    private int taskInstancePriority;
+
+    /**
+     * taskId
+     */
+    private int taskId;
+
+    /**
+     * groupName
+     */
+    private String groupName;
+
+    /**
+     *   ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
+     */
+    private String taskPriorityInfo;
+
+    public TaskPriority(){}
+
+    public TaskPriority(int processInstancePriority,
+                        int processInstanceId,
+                        int taskInstancePriority,
+                        int taskId, String groupName) {
+        this.processInstancePriority = processInstancePriority;
+        this.processInstanceId = processInstanceId;
+        this.taskInstancePriority = taskInstancePriority;
+        this.taskId = taskId;
+        this.groupName = groupName;
+        this.taskPriorityInfo = this.processInstancePriority +
+                UNDERLINE +
+                this.processInstanceId +
+                UNDERLINE +
+                this.taskInstancePriority +
+                UNDERLINE +
+                this.taskId +
+                UNDERLINE +
+                this.groupName;
+    }
+
+    public int getProcessInstancePriority() {
+        return processInstancePriority;
+    }
+
+    public void setProcessInstancePriority(int processInstancePriority) {
+        this.processInstancePriority = processInstancePriority;
+    }
+
+    public int getProcessInstanceId() {
+        return processInstanceId;
+    }
+
+    public void setProcessInstanceId(int processInstanceId) {
+        this.processInstanceId = processInstanceId;
+    }
+
+    public int getTaskInstancePriority() {
+        return taskInstancePriority;
+    }
+
+    public void setTaskInstancePriority(int taskInstancePriority) {
+        this.taskInstancePriority = taskInstancePriority;
+    }
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+    public String getTaskPriorityInfo() {
+        return taskPriorityInfo;
+    }
+
+    public void setTaskPriorityInfo(String taskPriorityInfo) {
+        this.taskPriorityInfo = taskPriorityInfo;
+    }
+
+    /**
+     * taskPriorityInfo convert taskPriority
+     *
+     * @param taskPriorityInfo taskPriorityInfo
+     * @return TaskPriority
+     */
+    public static TaskPriority of(String taskPriorityInfo){
+        String[] parts = taskPriorityInfo.split(UNDERLINE);
+        if (parts.length != 4) {
+            throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo));
+        }
+        TaskPriority taskPriority = new TaskPriority(
+                Integer.parseInt(parts[0]),
+                Integer.parseInt(parts[1]),
+                Integer.parseInt(parts[2]),
+                Integer.parseInt(parts[3]),
+                parts[4]);
+        return taskPriority;
+    }
+}

+ 169 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java

@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.consumer;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskPriority;
+import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * TaskUpdateQueue consumer
+ */
+@Component
+public class TaskUpdateQueueConsumer extends Thread{
+
+    /**
+     * logger of TaskUpdateQueueConsumer
+     */
+    private static final Logger logger = LoggerFactory.getLogger(TaskUpdateQueueConsumer.class);
+
+    /**
+     * taskUpdateQueue
+     */
+    @Autowired
+    private TaskUpdateQueue taskUpdateQueue;
+
+    /**
+     * processService
+     */
+    @Autowired
+    private ProcessService processService;
+
+    /**
+     * executor dispatcher
+     */
+    @Autowired
+    private ExecutorDispatcher dispatcher;
+
+    @Override
+    public void run() {
+        while (Stopper.isRunning()){
+            try {
+                if (taskUpdateQueue.size() == 0){
+                    continue;
+                }
+                String taskPriorityInfo = taskUpdateQueue.take();
+                TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
+                dispatch(taskPriority.getTaskId());
+            }catch (Exception e){
+                logger.error("dispatcher task error",e);
+            }
+        }
+    }
+
+
+    /**
+     * TODO dispatch task
+     *
+     * @param taskInstanceId taskInstanceId
+     * @return result
+     */
+    private Boolean dispatch(int taskInstanceId){
+        TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
+        ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
+        try {
+            return dispatcher.dispatch(executionContext);
+        } catch (ExecuteException e) {
+            logger.error("execute exception", e);
+            return false;
+        }
+
+    }
+
+    /**
+     * get TaskExecutionContext
+     * @param taskInstanceId taskInstanceId
+     * @return TaskExecutionContext
+     */
+    protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId){
+        TaskInstance taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstanceId);
+
+        Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
+        Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
+
+        // verify tenant is null
+        if (verifyTenantIsNull(tenant, taskInstance)) {
+            processService.changeTaskState(ExecutionStatus.FAILURE,
+                    taskInstance.getStartTime(),
+                    taskInstance.getHost(),
+                    null,
+                    null,
+                    taskInstance.getId());
+            return null;
+        }
+        // set queue for process instance, user-specified queue takes precedence over tenant queue
+        String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+        taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
+        taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+        taskInstance.setExecutePath(getExecLocalPath(taskInstance));
+
+        return TaskExecutionContextBuilder.get()
+                .buildTaskInstanceRelatedInfo(taskInstance)
+                .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
+                .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
+                .create();
+    }
+
+    /**
+     * get execute local path
+     *
+     * @return execute local path
+     */
+    private String getExecLocalPath(TaskInstance taskInstance){
+        return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+                taskInstance.getProcessDefine().getId(),
+                taskInstance.getProcessInstance().getId(),
+                taskInstance.getId());
+    }
+
+
+    /**
+     *  whehter tenant is null
+     * @param tenant tenant
+     * @param taskInstance taskInstance
+     * @return result
+     */
+    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
+        if(tenant == null){
+            logger.error("tenant not exists,process instance id : {},task instance id : {}",
+                    taskInstance.getProcessInstance().getId(),
+                    taskInstance.getId());
+            return true;
+        }
+        return false;
+    }
+
+
+
+}

+ 4 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
@@ -66,12 +67,14 @@ public class TaskAckProcessor implements NettyRequestProcessor {
         logger.info("taskAckCommand : {}", taskAckCommand);
 
         taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
+
+        String workerAddress = ChannelUtils.toAddress(channel).getAddress();
         /**
          * change Task state
          */
         processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
                 taskAckCommand.getStartTime(),
-                taskAckCommand.getHost(),
+                workerAddress,
                 taskAckCommand.getExecutePath(),
                 taskAckCommand.getLogPath(),
                 taskAckCommand.getTaskInstanceId());

+ 46 - 106
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@@ -17,26 +17,18 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
-import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
-import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static org.apache.dolphinscheduler.common.Constants.*;
 
 import java.util.concurrent.Callable;
 
@@ -71,11 +63,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
      */
     protected TaskInstance taskInstance;
 
-    /**
-     * task queue
-     */
-    protected ITaskQueue taskQueue;
-
     /**
      * whether need cancel
      */
@@ -86,12 +73,10 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
      */
     private MasterConfig masterConfig;
 
-
     /**
-     * executor dispatcher
+     * taskUpdateQueue
      */
-    private ExecutorDispatcher dispatcher;
-
+    private TaskUpdateQueue taskUpdateQueue;
     /**
      * constructor of MasterBaseTaskExecThread
      * @param taskInstance      task instance
@@ -101,11 +86,10 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
         this.processService = SpringApplicationContext.getBean(ProcessService.class);
         this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
         this.processInstance = processInstance;
-        this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
         this.cancel = false;
         this.taskInstance = taskInstance;
         this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-        this.dispatcher = SpringApplicationContext.getBean(ExecutorDispatcher.class);
+        this.taskUpdateQueue = new TaskUpdateQueueImpl();
     }
 
     /**
@@ -123,87 +107,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
         this.cancel = true;
     }
 
-    /**
-     * TODO 分发任务
-     * dispatch task to worker
-     * @param taskInstance
-     */
-    private Boolean dispatch(TaskInstance taskInstance){
-        TaskExecutionContext context = getTaskExecutionContext(taskInstance);
-        ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
-        try {
-            return dispatcher.dispatch(executionContext);
-        } catch (ExecuteException e) {
-            logger.error("execute exception", e);
-            return false;
-        }
-
-    }
-
-    /**
-     * get TaskExecutionContext
-     *
-     * @param taskInstance taskInstance
-     * @return TaskExecutionContext
-     */
-    protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){
-        taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
-
-        Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
-        Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
-
-        // verify tenant is null
-        if (verifyTenantIsNull(tenant, taskInstance)) {
-            processService.changeTaskState(ExecutionStatus.FAILURE,
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    null,
-                    null,
-                    taskInstance.getId());
-            return null;
-        }
-        // set queue for process instance, user-specified queue takes precedence over tenant queue
-        String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
-        taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
-        taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
-        taskInstance.setExecutePath(getExecLocalPath(taskInstance));
-
-        return TaskExecutionContextBuilder.get()
-                .buildTaskInstanceRelatedInfo(taskInstance)
-                .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
-                .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
-                .create();
-    }
-
-
-    /**
-     * get execute local path
-     *
-     * @return execute local path
-     */
-    private String getExecLocalPath(TaskInstance taskInstance){
-        return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
-                taskInstance.getProcessDefine().getId(),
-                taskInstance.getProcessInstance().getId(),
-                taskInstance.getId());
-    }
-
-
-    /**
-     *  whehter tenant is null
-     * @param tenant tenant
-     * @param taskInstance taskInstance
-     * @return result
-     */
-    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
-        if(tenant == null){
-            logger.error("tenant not exists,process instance id : {},task instance id : {}",
-                    taskInstance.getProcessInstance().getId(),
-                    taskInstance.getId());
-            return true;
-        }
-        return false;
-    }
     /**
      * submit master base task exec thread
      * @return TaskInstance
@@ -268,10 +171,20 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
                 logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName()));
                 return true;
             }
-            logger.info("task ready to submit: {}" , taskInstance);
-            boolean submitTask = dispatch(taskInstance);
+            logger.info("task ready to submit: {}", taskInstance);
+
+            /**
+             *  taskPriorityInfo
+             */
+            String taskPriorityInfo = buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(),
+                    processInstance.getId(),
+                    taskInstance.getProcessInstancePriority().getCode(),
+                    taskInstance.getId(),
+                    taskInstance.getWorkerGroup());
+
+            taskUpdateQueue.put(taskPriorityInfo);
             logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
-            return submitTask;
+            return true;
         }catch (Exception e){
             logger.error("submit task  Exception: ", e);
             logger.error("task error : %s", JSONUtils.toJson(taskInstance));
@@ -279,6 +192,33 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
         }
     }
 
+
+    /**
+     *  buildTaskPriorityInfo
+     *
+     * @param processInstancePriority processInstancePriority
+     * @param processInstanceId processInstanceId
+     * @param taskInstancePriority taskInstancePriority
+     * @param taskInstanceId taskInstanceId
+     * @param workerGroup workerGroup
+     * @return TaskPriorityInfo
+     */
+    private String buildTaskPriorityInfo(int processInstancePriority,
+                                         int processInstanceId,
+                                         int taskInstancePriority,
+                                         int taskInstanceId,
+                                         String workerGroup){
+        return processInstancePriority +
+                UNDERLINE +
+                processInstanceId +
+                UNDERLINE +
+                taskInstancePriority +
+                UNDERLINE +
+                taskInstanceId +
+                UNDERLINE +
+                workerGroup;
+    }
+
     /**
      * submit wait complete
      * @return true

+ 4 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@@ -182,7 +182,10 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
         }
         alreadyKilled = true;
 
-        TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setTaskInstanceId(taskInstance.getId());
+        taskExecutionContext.setProcessId(taskInstance.getPid());
+
         ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());
 
         Host host = Host.of(taskInstance.getHost());

+ 2 - 6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
 import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,10 +60,7 @@ public class WorkerServer implements IStoppable {
     @Autowired
     private ZKWorkerClient zkWorkerClient = null;
 
-    /**
-     * task queue impl
-     */
-    protected ITaskQueue taskQueue;
+
 
     /**
      *  fetch task executor service
@@ -136,7 +132,7 @@ public class WorkerServer implements IStoppable {
 
         this.zkWorkerClient.init();
 
-        this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
+
 
         this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
 

+ 2 - 40
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,11 +97,6 @@ public class ProcessService {
     @Autowired
     private  ProjectMapper projectMapper;
 
-    /**
-     * task queue impl
-     */
-    @Autowired
-    private ITaskQueue taskQueue;
     /**
      * handle Command (construct ProcessInstance from Command) , wrapped in transaction
      * @param logger logger
@@ -960,40 +954,7 @@ public class ProcessService {
         return taskInstance;
     }
 
-    /**
-     * submit task to queue
-     * @param taskInstance taskInstance
-     * @return whether submit task to queue success
-     */
-    public Boolean submitTaskToQueue(TaskInstance taskInstance) {
 
-        try{
-            if(taskInstance.isSubProcess()){
-                return true;
-            }
-            if(taskInstance.getState().typeIsFinished()){
-                logger.info(String.format("submit to task queue, but task [%s] state [%s] is already  finished. ", taskInstance.getName(), taskInstance.getState().toString()));
-                return true;
-            }
-            // task cannot submit when running
-            if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
-                logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName()));
-                return true;
-            }
-            if(checkTaskExistsInTaskQueue(taskInstance)){
-                logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", taskInstance.getName()));
-                return true;
-            }
-            logger.info("task ready to queue: {}" , taskInstance);
-            boolean insertQueueResult = taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
-            logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) );
-            return insertQueueResult;
-        }catch (Exception e){
-            logger.error("submit task to queue Exception: ", e);
-            logger.error("task queue error : %s", JSONUtils.toJson(taskInstance));
-            return false;
-        }
-    }
 
     /**
      * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskInstanceId}_${task executed by ip1},${ip2}...
@@ -1127,7 +1088,8 @@ public class ProcessService {
 
         String taskZkInfo = taskZkInfo(taskInstance);
 
-        return taskQueue.checkTaskExists(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo);
+//        return taskQueue.checkTaskExists(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo);
+        return false;
     }
 
     /**

+ 0 - 102
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java

@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.queue;
-
-import java.util.List;
-import java.util.Set;
-
-public interface ITaskQueue {
-
-    /**
-     * take out all the elements
-     *
-     *
-     * @param key
-     * @return
-     */
-    List<String> getAllTasks(String key);
-
-    /**
-     * check if has a task
-     * @param key queue name
-     * @return true if has; false if not
-     */
-    boolean hasTask(String key);
-
-    /**
-     * check task exists in the task queue or not
-     *
-     * @param key queue name
-     * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-     * @return true if exists in the queue
-     */
-    boolean checkTaskExists(String key, String task);
-
-    /**
-     * add an element to the queue
-     *
-     * @param key  queue name
-     * @param value
-     */
-    boolean add(String key, String value);
-
-    /**
-     * an element pops out of the queue
-     *
-     * @param key  queue name
-     * @param n    how many elements to poll
-     * @return
-     */
-    List<String> poll(String key, int n);
-
-    /**
-     * remove a element from queue
-     * @param key
-     * @param value
-     */
-    void removeNode(String key, String value);
-
-    /**
-     * add an element to the set
-     *
-     * @param key
-     * @param value
-     */
-    void sadd(String key, String value);
-
-    /**
-     * delete the value corresponding to the key in the set
-     *
-     * @param key
-     * @param value
-     */
-    void srem(String key, String value);
-
-    /**
-     * gets all the elements of the set based on the key
-     *
-     * @param key
-     * @return
-     */
-    Set<String> smembers(String key);
-
-
-    /**
-     * clear the task queue for use by junit tests only
-     */
-    void delete();
-}

+ 2 - 2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java

@@ -40,11 +40,11 @@ public class TaskQueueFactory {
    *
    * @return instance
    */
-  public static ITaskQueue getTaskQueueInstance() {
+  public static TaskUpdateQueue getTaskQueueInstance() {
     String queueImplValue = CommonUtils.getQueueImplValue();
     if (StringUtils.isNotBlank(queueImplValue)) {
         logger.info("task queue impl use zookeeper ");
-        return SpringApplicationContext.getBean(TaskQueueZkImpl.class);
+        return SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
     }else{
       logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
       System.exit(-1);

+ 0 - 375
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java

@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.queue;
-
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.IpUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.service.zk.ZookeeperOperator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.*;
-
-/**
- * A singleton of a task queue implemented with zookeeper
- * tasks queue implementation
- */
-@Service
-public class TaskQueueZkImpl implements ITaskQueue {
-
-    private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);
-
-    private final ZookeeperOperator zookeeperOperator;
-
-    @Autowired
-    public TaskQueueZkImpl(ZookeeperOperator zookeeperOperator) {
-        this.zookeeperOperator = zookeeperOperator;
-
-        try {
-            String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-            String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
-
-            for (String key : new String[]{tasksQueuePath,tasksKillPath}){
-                if (!zookeeperOperator.isExisted(key)){
-                    zookeeperOperator.persist(key, "");
-                    logger.info("create tasks queue parent node success : {}", key);
-                }
-            }
-        } catch (Exception e) {
-            logger.error("create tasks queue parent node failure", e);
-        }
-    }
-
-
-    /**
-     * get all tasks from tasks queue
-     * @param key   task queue name
-     * @return
-     */
-    @Override
-    public List<String> getAllTasks(String key) {
-        try {
-            List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
-            return list;
-        } catch (Exception e) {
-            logger.error("get all tasks from tasks queue exception",e);
-        }
-        return Collections.emptyList();
-    }
-
-    /**
-     * check if has a task
-     * @param key queue name
-     * @return true if has; false if not
-     */
-    @Override
-    public boolean hasTask(String key) {
-        try {
-            return zookeeperOperator.hasChildren(getTasksPath(key));
-        } catch (Exception e) {
-            logger.error("check has task in tasks queue exception",e);
-        }
-        return false;
-    }
-
-    /**
-     * check task exists in the task queue or not
-     *
-     * @param key       queue name
-     * @param task      ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-     * @return true if exists in the queue
-     */
-    @Override
-    public boolean checkTaskExists(String key, String task) {
-        String taskPath = getTasksPath(key) + Constants.SINGLE_SLASH + task;
-
-        return zookeeperOperator.isExisted(taskPath);
-
-    }
-
-
-    /**
-     * add task to tasks queue
-     *
-     * @param key      task queue name
-     * @param value    ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
-     */
-    @Override
-    public boolean add(String key, String value){
-        try {
-            String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
-            zookeeperOperator.persist(taskIdPath, value);
-            return true;
-        } catch (Exception e) {
-            logger.error("add task to tasks queue exception",e);
-            return false;
-        }
-
-    }
-
-
-    /**
-     * An element pops out of the queue <p>
-     * note:
-     *   ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
-     *   The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
-     *
-     * @param  key  task queue name
-     * @param  tasksNum    how many elements to poll
-     * @return the task ids  to be executed
-     */
-    @Override
-    public List<String> poll(String key, int tasksNum) {
-        try{
-            List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
-
-            if(list != null && list.size() > 0){
-
-                String workerIp = OSUtils.getHost();
-                String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp));
-
-                int size = list.size();
-
-                Set<String> taskTreeSet = new TreeSet<>(new Comparator<String>() {
-                    @Override
-                    public int compare(String o1, String o2) {
-
-                        String s1 = o1;
-                        String s2 = o2;
-                        String[] s1Array = s1.split(Constants.UNDERLINE);
-                        if(s1Array.length>4){
-                            // warning: if this length > 5, need to be changed
-                            s1 = s1.substring(0, s1.lastIndexOf(Constants.UNDERLINE) );
-                        }
-
-                        String[] s2Array = s2.split(Constants.UNDERLINE);
-                        if(s2Array.length>4){
-                            // warning: if this length > 5, need to be changed
-                            s2 = s2.substring(0, s2.lastIndexOf(Constants.UNDERLINE) );
-                        }
-
-                        return s1.compareTo(s2);
-                    }
-                });
-
-                for (int i = 0; i < size; i++) {
-
-                    String taskDetail = list.get(i);
-                    String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
-
-                    //forward compatibility
-                    if(taskDetailArrs.length >= 4){
-
-                        //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-                        String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
-                        if(taskDetailArrs.length > 4){
-                            String taskHosts = taskDetailArrs[4];
-
-                            //task can assign to any worker host if equals default ip value of worker server
-                            if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){
-                                String[] taskHostsArr = taskHosts.split(Constants.COMMA);
-                                if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
-                                    continue;
-                                }
-                            }
-                            formatTask += Constants.UNDERLINE + taskDetailArrs[4];
-                        }
-                        taskTreeSet.add(formatTask);
-                    }
-                }
-
-                List<String> tasksList = getTasksListFromTreeSet(tasksNum, taskTreeSet);
-
-                logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(tasksList.toArray()), size - tasksList.size());
-
-                return tasksList;
-            }else{
-                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-            }
-
-        } catch (Exception e) {
-            logger.error("add task to tasks queue exception",e);
-        }
-        return Collections.emptyList();
-    }
-
-
-    /**
-     * get task list from tree set
-     *
-     * @param tasksNum
-     * @param taskTreeSet
-     */
-    public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
-        Iterator<String> iterator = taskTreeSet.iterator();
-        int j = 0;
-        List<String> tasksList = new ArrayList<>(tasksNum);
-        while(iterator.hasNext()){
-            if(j++ >= tasksNum){
-                break;
-            }
-            String task = iterator.next();
-            tasksList.add(getOriginTaskFormat(task));
-        }
-        return tasksList;
-    }
-
-    /**
-     * format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-     * processInstanceId and task id need to be convert to int.
-     * @param formatTask
-     * @return
-     */
-    private String getOriginTaskFormat(String formatTask){
-        String[] taskArray = formatTask.split(Constants.UNDERLINE);
-        if(taskArray.length< 4){
-            return formatTask;
-        }
-        int processInstanceId = Integer.parseInt(taskArray[1]);
-        int taskId = Integer.parseInt(taskArray[3]);
-
-        StringBuilder sb = new StringBuilder(50);
-        String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[2], taskId);
-
-        sb.append(destTask);
-
-        if(taskArray.length > 4){
-            for(int index = 4; index < taskArray.length; index++){
-                sb.append(Constants.UNDERLINE).append(taskArray[index]);
-            }
-        }
-        return sb.toString();
-    }
-
-    @Override
-    public void removeNode(String key, String nodeValue){
-
-        String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
-        String taskIdPath = tasksQueuePath + nodeValue;
-        logger.info("removeNode task {}", taskIdPath);
-        try{
-            zookeeperOperator.remove(taskIdPath);
-
-        }catch(Exception e){
-            logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e);
-        }
-
-    }
-
-
-
-    /**
-     * In order to be compatible with redis implementation
-     *
-     * To be compatible with the redis implementation, add an element to the set
-     * @param key   The key is the kill/cancel queue path name
-     * @param value host-taskId  The name of the zookeeper node
-     */
-    @Override
-    public void sadd(String key,String value) {
-        try {
-
-            if(value != null && value.trim().length() > 0){
-                String path = getTasksPath(key) + Constants.SINGLE_SLASH;
-                if(!zookeeperOperator.isExisted(path + value)){
-                    zookeeperOperator.persist(path + value,value);
-                    logger.info("add task:{} to tasks set ",value);
-                } else{
-                    logger.info("task {} exists in tasks set ",value);
-                }
-
-            }else{
-                logger.warn("add host-taskId:{} to tasks set is empty ",value);
-            }
-
-        } catch (Exception e) {
-            logger.error("add task to tasks set exception",e);
-        }
-    }
-
-
-    /**
-     * delete the value corresponding to the key in the set
-     * @param key   The key is the kill/cancel queue path name
-     * @param value host-taskId-taskType The name of the zookeeper node
-     */
-    @Override
-    public void srem(String key, String value) {
-        try{
-            String path = getTasksPath(key) + Constants.SINGLE_SLASH;
-            zookeeperOperator.remove(path + value);
-
-        }catch(Exception e){
-            logger.error(String.format("delete task:" + value + " exception"),e);
-        }
-    }
-
-
-    /**
-     * Gets all the elements of the set based on the key
-     * @param key  The key is the kill/cancel queue path name
-     * @return
-     */
-    @Override
-    public Set<String> smembers(String key) {
-        try {
-            List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
-            return new HashSet<>(list);
-        } catch (Exception e) {
-            logger.error("get all tasks from tasks queue exception",e);
-        }
-        return Collections.emptySet();
-    }
-
-    /**
-     * Clear the task queue of zookeeper node
-     */
-    @Override
-    public void delete(){
-        try {
-            String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-            String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
-
-            for (String key : new String[]{tasksQueuePath,tasksKillPath}){
-                if (zookeeperOperator.isExisted(key)){
-                    List<String> list = zookeeperOperator.getChildrenKeys(key);
-                    for (String task : list) {
-                        zookeeperOperator.remove(key + Constants.SINGLE_SLASH + task);
-                        logger.info("delete task from tasks queue : {}/{} ", key, task);
-                    }
-                }
-            }
-
-        } catch (Exception e) {
-            logger.error("delete all tasks in tasks queue failure", e);
-        }
-    }
-
-    /**
-     * Get the task queue path
-     * @param key  task queue name
-     * @return
-     */
-    public String getTasksPath(String key){
-        return zookeeperOperator.getZookeeperConfig().getDsRoot() + Constants.SINGLE_SLASH + key;
-    }
-
-}

+ 23 - 27
dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java

@@ -14,35 +14,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package queue;
+package org.apache.dolphinscheduler.service.queue;
 
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
-import org.junit.*;
 
-/**
- * base task queue test for only start zk server once
- */
-@Ignore
-public class BaseTaskQueueTest {
+public interface TaskUpdateQueue {
 
-    protected static ITaskQueue tasksQueue = null;
+    /**
+     * put task info
+     *
+     * @param taskInfo taskInfo
+     * @throws Exception
+     */
+    void put(String taskInfo) throws Exception;
 
-    @BeforeClass
-    public static void setup() {
-        ZKServer.start();
-        tasksQueue = TaskQueueFactory.getTaskQueueInstance();
-        //clear all data
-        tasksQueue.delete();
-    }
+    /**
+     * take taskInfo
+     * @return taskInfo
+     * @throws Exception
+     */
+    String take()throws Exception;
 
-    @AfterClass
-    public static void tearDown() {
-        tasksQueue.delete();
-        ZKServer.stop();
-    }
-    @Test
-    public void tasksQueueNotNull(){
-        Assert.assertNotNull(tasksQueue);
-    }
-}
+    /**
+     * size
+     *
+     * @return size
+     * @throws Exception
+     */
+    int size() throws Exception;
+}

+ 115 - 0
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java

@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.queue;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import static org.apache.dolphinscheduler.common.Constants.*;
+
+/**
+ * A singleton of a task queue implemented with zookeeper
+ * tasks queue implementation
+ */
+@Service
+public class TaskUpdateQueueImpl implements TaskUpdateQueue {
+
+    private static final Logger logger = LoggerFactory.getLogger(TaskUpdateQueueImpl.class);
+
+    /**
+     * queue size
+     */
+    private static final Integer QUEUE_MAX_SIZE = 100;
+
+    /**
+     * queue
+     */
+    private PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+
+    /**
+     * put task takePriorityInfo
+     *
+     * @param taskPriorityInfo takePriorityInfo
+     * @throws Exception
+     */
+    @Override
+    public void put(String taskPriorityInfo) throws Exception {
+
+        if (QUEUE_MAX_SIZE.equals(queue.size())){
+            //TODO need persist db , then load from db to queue when queue size is zero
+            logger.error("queue is full...");
+            return;
+        }
+        queue.put(taskPriorityInfo);
+    }
+
+    /**
+     * take taskInfo
+     * @return taskInfo
+     * @throws Exception
+     */
+    @Override
+    public String take() throws Exception {
+        return queue.take();
+    }
+
+    /**
+     * queue size
+     * @return size
+     * @throws Exception
+     */
+    @Override
+    public int size() throws Exception {
+        return queue.size();
+    }
+
+    /**
+     * TaskInfoComparator
+     */
+    private class TaskInfoComparator implements Comparator<String>{
+
+        /**
+         * compare o1 o2
+         * @param o1 o1
+         * @param o2 o2
+         * @return compare result
+         */
+        @Override
+        public int compare(String o1, String o2) {
+            String s1 = o1;
+            String s2 = o2;
+            String[] s1Array = s1.split(UNDERLINE);
+            if(s1Array.length > TASK_INFO_LENGTH){
+                // warning: if this length > 5, need to be changed
+                s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE) );
+            }
+
+            String[] s2Array = s2.split(UNDERLINE);
+            if(s2Array.length > TASK_INFO_LENGTH){
+                // warning: if this length > 5, need to be changed
+                s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE) );
+            }
+
+            return s1.compareTo(s2);
+        }
+    }
+}

+ 0 - 229
dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java

@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package queue;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.IpUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-/**
- * task queue test
- */
-@Ignore
-public class TaskQueueZKImplTest extends BaseTaskQueueTest  {
-
-    @Before
-    public void before(){
-
-        //clear all data
-        tasksQueue.delete();
-    }
-
-    @After
-    public void after(){
-        //clear all data
-        tasksQueue.delete();
-    }
-
-    /**
-     * test take out all the elements
-     */
-    @Test
-    public  void getAllTasks(){
-
-        //add
-        init();
-        // get all
-        List<String> allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-        assertEquals(allTasks.size(),2);
-        //delete all
-        tasksQueue.delete();
-        allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-        assertEquals(allTasks.size(),0);
-    }
-    @Test
-    public void hasTask(){
-        init();
-        boolean hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-        assertTrue(hasTask);
-        //delete all
-        tasksQueue.delete();
-        hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-        assertFalse(hasTask);
-    }
-    /**
-     * test check task exists in the task queue or not
-     */
-    @Test
-    public  void checkTaskExists(){
-
-        String task= "1_0_1_1_-1";
-        //add
-        init();
-        // check Exist true
-        boolean taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task);
-        assertTrue(taskExists);
-
-        //remove task
-        tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        // check Exist false
-        taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task);
-        assertFalse(taskExists);
-    }
-
-    /**
-     * test add  element to the queue
-     */
-    @Test
-    public void add(){
-
-        //add
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1");
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost()));
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_2_1_1_" + IpUtils.ipToLong(OSUtils.getHost()) + 10);
-
-        List<String> tasks = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1);
-
-        if(tasks.size() <= 0){
-            return;
-        }
-
-        //pop
-        String node1 = tasks.get(0);
-        assertEquals(node1,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost()));
-    }
-
-    /**
-     *  test  element pops out of the queue
-     */
-    @Test
-    public  void poll(){
-        
-        //add
-        init();
-        List<String> taskList = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 2);
-        assertEquals(taskList.size(),2);
-
-        assertEquals(taskList.get(0),"0_1_1_1_-1");
-        assertEquals(taskList.get(1),"1_0_1_1_-1");
-    }
-
-    /**
-     * test remove  element from queue
-     */
-    @Test
-    public void removeNode(){
-        String task = "1_0_1_1_-1";
-        //add
-        init();
-        tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        assertFalse(tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task));
-    }
-
-    /**
-     * test add an element to the set
-     */
-    @Test
-    public void sadd(){
-
-        String task = "1_0_1_1_-1";
-        tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        //check size
-        assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
-    }
-
-
-    /**
-     * test delete the value corresponding to the key in the set
-     */
-    @Test
-    public void srem(){
-
-        String task = "1_0_1_1_-1";
-        tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        //check size
-        assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
-        //remove and get size
-        tasksQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0);
-    }
-
-    /**
-     * test gets all the elements of the set based on the key
-     */
-    @Test
-    public void smembers(){
-
-        //first init
-        assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0);
-        //add
-        String task = "1_0_1_1_-1";
-        tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        //check size
-        assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
-        //add
-        task = "0_1_1_1_";
-        tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-        //check size
-        assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),2);
-    }
-
-
-    /**
-     * init data
-     */
-    private void init(){
-        //add
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
-        tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1");
-    }
-
-
-
-    /**
-     * test one million data from zookeeper queue
-     */
-    @Ignore
-    @Test
-    public void extremeTest(){
-        int total = 30 * 10000;
-
-        for(int i = 0; i < total; i++) {
-            for(int j = 0; j < total; j++) {
-                //${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-                //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
-                String formatTask = String.format("%s_%d_%s_%d", i, i + 1, j, j == 0 ?  0 : j + new Random().nextInt(100));
-                tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, formatTask);
-            }
-        }
-
-        String node1 = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1).get(0);
-        assertEquals(node1,"0");
-
-    }
-
-}

+ 59 - 0
dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java

@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package queue;
+
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TaskUpdateQueueTest {
+
+    /**
+     * test put
+     */
+    @Test
+    public void testQueue() throws Exception{
+
+        // ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
+
+        /**
+         * 1_1_2_1_default
+         * 1_1_2_2_default
+         * 1_1_0_3_default
+         * 1_1_0_4_default
+         */
+
+        String taskInfo1 = "1_1_2_1_default";
+        String taskInfo2 = "1_1_2_2_default";
+        String taskInfo3 = "1_1_0_3_default";
+        String taskInfo4 = "1_1_0_4_default";
+
+        TaskUpdateQueue queue = new TaskUpdateQueueImpl();
+        queue.put(taskInfo1);
+        queue.put(taskInfo2);
+        queue.put(taskInfo3);
+        queue.put(taskInfo4);
+
+        assertEquals("1_1_0_3_default", queue.take());
+        assertEquals("1_1_0_4_default", queue.take());
+        assertEquals("1_1_2_1_default",queue.take());
+        assertEquals("1_1_2_2_default",queue.take());
+    }
+}