|
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.constants.Constants;
|
|
|
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
|
|
|
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
|
|
|
+import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
|
|
|
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
|
|
|
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
|
|
|
|
|
@@ -42,8 +43,8 @@ public class WorkerManagerThread implements Runnable {
|
|
|
private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
|
|
|
|
|
|
private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue;
|
|
|
-
|
|
|
private final WorkerExecService workerExecService;
|
|
|
+ private final WorkerConfig workerConfig;
|
|
|
|
|
|
private final int workerExecThreads;
|
|
|
|
|
@@ -54,6 +55,7 @@ public class WorkerManagerThread implements Runnable {
|
|
|
new ConcurrentHashMap<>();
|
|
|
|
|
|
public WorkerManagerThread(WorkerConfig workerConfig) {
|
|
|
+ this.workerConfig = workerConfig;
|
|
|
workerExecThreads = workerConfig.getExecThreads();
|
|
|
this.waitSubmitQueue = new DelayQueue<>();
|
|
|
workerExecService = new WorkerExecService(
|
|
@@ -95,6 +97,10 @@ public class WorkerManagerThread implements Runnable {
|
|
|
}
|
|
|
|
|
|
public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
|
|
|
+ if (workerConfig.getTaskExecuteThreadsFullPolicy() == TaskExecuteThreadsFullPolicy.CONTINUE) {
|
|
|
+ return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
|
|
|
+ }
|
|
|
+
|
|
|
if (waitSubmitQueue.size() > workerExecThreads) {
|
|
|
logger.warn("Wait submit queue is full, will retry submit task later");
|
|
|
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
|