Browse Source

the process instance is deleted, the task corresponding to the zk queue still exists, and the task is squeezed. #754 bug fix (#775)

* mission log disorder,bug #751 fix

* the log path of the task and the log path of the task instance are different. The log cannot be viewed. #723

* the log path of the task and the log path of the task instance are different. The log cannot be viewed. #723 bug fix

* after starting kerberos authentication, tgt expires after one day,bug #742 fix

* log pattern modify

* LoggerServer remove comment code and ShellCommandExecutor modify

* PythonCommandExecutor modify

* Concurrent task log bug #730 fix

* remove invalid commit

* The process instance is deleted, the task corresponding to the zk queue still exists, and the task is squeezed. #754 fix bug

* The process instance is deleted, the task corresponding to the zk queue still exists, and the task is squeezed. #754 re fix

* The process instance is deleted, the task corresponding to the zk queue still exists, and the task is squeezed. #754 bug fix
乔占卫 5 years ago
parent
commit
a6e2d1eb82

+ 50 - 18
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java

@@ -32,10 +32,7 @@ import cn.escheduler.common.model.TaskNodeRelation;
 import cn.escheduler.common.process.Property;
 import cn.escheduler.common.queue.ITaskQueue;
 import cn.escheduler.common.queue.TaskQueueFactory;
-import cn.escheduler.common.utils.CollectionUtils;
-import cn.escheduler.common.utils.DateUtils;
-import cn.escheduler.common.utils.JSONUtils;
-import cn.escheduler.common.utils.ParameterUtils;
+import cn.escheduler.common.utils.*;
 import cn.escheduler.common.utils.placeholder.BusinessTimeUtils;
 import cn.escheduler.dao.ProcessDao;
 import cn.escheduler.dao.mapper.*;
@@ -493,29 +490,64 @@ public class ProcessInstanceService extends BaseDAGService {
             return result;
         }
 
-        int delete = processDao.deleteWorkProcessInstanceById(processInstanceId);
-        processDao.deleteAllSubWorkProcessByParentId(processInstanceId);
-        processDao.deleteWorkProcessMapByParentId(processInstanceId);
+        // delete zk queue
+        if (CollectionUtils.isNotEmpty(taskInstanceList)){
+            for (TaskInstance taskInstance : taskInstanceList){
+                // task instance priority
+                int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal();
+
+                StringBuilder nodeValueSb = new StringBuilder(100);
+                nodeValueSb.append(processInstancePriority)
+                        .append(UNDERLINE)
+                        .append(processInstanceId)
+                        .append(UNDERLINE)
+                        .append(taskInstancePriority)
+                        .append(UNDERLINE)
+                        .append(taskInstance.getId())
+                        .append(UNDERLINE);
+
+                int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance);
+                WorkerGroup workerGroup = workerGroupMapper.queryById(taskWorkerGroupId);
+
+                if(workerGroup == null){
+                    nodeValueSb.append(DEFAULT_WORKER_ID);
+                }else {
+
+                    String ips = workerGroup.getIpList();
+                    StringBuilder ipSb = new StringBuilder(100);
+                    String[] ipArray = ips.split(COMMA);
+
+                    for (String ip : ipArray) {
+                        long ipLong = IpUtils.ipToLong(ip);
+                        ipSb.append(ipLong).append(COMMA);
+                    }
 
-        if (delete > 0) {
-            if (CollectionUtils.isNotEmpty(taskInstanceList)){
-                for (TaskInstance taskInstance : taskInstanceList){
-                    // task instance priority
-                    int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal();
-                    String nodeValue=processInstancePriority + "_" + processInstanceId + "_" +taskInstancePriority + "_" + taskInstance.getId();
-                    try {
-                        logger.info("delete task queue node : {}",nodeValue);
-                        tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValue);
-                    }catch (Exception e){
-                        logger.error("delete task queue node : {}", nodeValue);
+                    if(ipSb.length() > 0) {
+                        ipSb.deleteCharAt(ipSb.length() - 1);
                     }
+                    nodeValueSb.append(ipSb);
+                }
+
+                try {
+                    logger.info("delete task queue node : {}",nodeValueSb.toString());
+                    tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValueSb.toString());
+                }catch (Exception e){
+                    logger.error("delete task queue node : {}", nodeValueSb.toString());
                 }
             }
+        }
 
+        // delete database cascade
+        int delete = processDao.deleteWorkProcessInstanceById(processInstanceId);
+        processDao.deleteAllSubWorkProcessByParentId(processInstanceId);
+        processDao.deleteWorkProcessMapByParentId(processInstanceId);
+
+        if (delete > 0) {
             putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
         }
+
         return result;
     }