Browse Source

[Feature-4138][Master] dispatch workgroup error add sleep time (#4139)

* When there are tasks with assignment failure and the number of tasks in the current task queue is less than 10, sleep for 1 second

* When there are tasks with assignment failure and the number of tasks in the current task queue is less than 10, sleep for 1 second

* fix code smell & code style

* fix code smell & code style

Co-authored-by: zhanglong <zhanglong@ysstech.com>
BoYiZhang 4 years ago
parent
commit
b3120a74d2

+ 27 - 19
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@@ -64,6 +64,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -136,9 +137,17 @@ public class TaskPriorityQueueConsumer extends Thread {
                         failedDispatchTasks.add(taskPriorityInfo);
                     }
                 }
-                for (String dispatchFailedTask : failedDispatchTasks) {
-                    taskPriorityQueue.put(dispatchFailedTask);
+                if (!failedDispatchTasks.isEmpty()) {
+                    for (String dispatchFailedTask : failedDispatchTasks) {
+                        taskPriorityQueue.put(dispatchFailedTask);
+                    }
+                    // If there are tasks in a cycle that cannot find the worker group,
+                    // sleep for 1 second
+                    if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
+                        TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+                    }
                 }
+
             } catch (Exception e) {
                 logger.error("dispatcher task error", e);
             }
@@ -151,7 +160,7 @@ public class TaskPriorityQueueConsumer extends Thread {
      * @param taskInstanceId taskInstanceId
      * @return result
      */
-    private boolean dispatch(int taskInstanceId) {
+    protected boolean dispatch(int taskInstanceId) {
         boolean result = false;
         try {
             TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
@@ -224,7 +233,6 @@ public class TaskPriorityQueueConsumer extends Thread {
         // SQL task
         if (taskType == TaskType.SQL) {
             setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
-
         }
 
         // DATAX task
@@ -271,22 +279,22 @@ public class TaskPriorityQueueConsumer extends Thread {
      * @param dataxTaskExecutionContext dataxTaskExecutionContext
      * @param taskNode                  taskNode
      */
-    private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
+    protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
         DataxParameters dataxParameters = JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
 
-        DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
-        DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
+        DataSource dbSource = processService.findDataSourceById(dataxParameters.getDataSource());
+        DataSource dbTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
 
-        if (dataSource != null) {
+        if (dbSource != null) {
             dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
-            dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
-            dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+            dataxTaskExecutionContext.setSourcetype(dbSource.getType().getCode());
+            dataxTaskExecutionContext.setSourceConnectionParams(dbSource.getConnectionParams());
         }
 
-        if (dataTarget != null) {
+        if (dbTarget != null) {
             dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
-            dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
-            dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+            dataxTaskExecutionContext.setTargetType(dbTarget.getType().getCode());
+            dataxTaskExecutionContext.setTargetConnectionParams(dbTarget.getConnectionParams());
         }
     }
 
@@ -374,7 +382,7 @@ public class TaskPriorityQueueConsumer extends Thread {
      * @param taskInstance taskInstance
      * @return result
      */
-    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
+    protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
         if (tenant == null) {
             logger.error("tenant not exists,process instance id : {},task instance id : {}",
                 taskInstance.getProcessInstance().getId(),
@@ -387,8 +395,8 @@ public class TaskPriorityQueueConsumer extends Thread {
     /**
      * get resource map key is full name and value is tenantCode
      */
-    private Map<String, String> getResourceFullNames(TaskNode taskNode) {
-        Map<String, String> resourceMap = new HashMap<>();
+    protected Map<String, String> getResourceFullNames(TaskNode taskNode) {
+        Map<String, String> resourcesMap = new HashMap<>();
         AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
 
         if (baseParam != null) {
@@ -400,7 +408,7 @@ public class TaskPriorityQueueConsumer extends Thread {
                 if (CollectionUtils.isNotEmpty(oldVersionResources)) {
 
                     oldVersionResources.forEach(
-                        (t) -> resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
+                        (t) -> resourcesMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
                     );
                 }
 
@@ -413,12 +421,12 @@ public class TaskPriorityQueueConsumer extends Thread {
 
                     List<Resource> resources = processService.listResourceByIds(resourceIds);
                     resources.forEach(
-                        (t) -> resourceMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
+                        (t) -> resourcesMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
                     );
                 }
             }
         }
 
-        return resourceMap;
+        return resourcesMap;
     }
 }

File diff suppressed because it is too large
+ 429 - 30
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java