|
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
|
|
|
|
|
|
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
|
|
|
import org.apache.dolphinscheduler.common.Constants;
|
|
|
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
|
|
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
|
|
|
import org.apache.dolphinscheduler.common.thread.Stopper;
|
|
|
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
|
@@ -139,6 +140,7 @@ public class FetchTaskThread implements Runnable{
|
|
|
logger.info("worker start fetch tasks...");
|
|
|
while (Stopper.isRunning()){
|
|
|
InterProcessMutex mutex = null;
|
|
|
+ String currentTaskQueueStr = null;
|
|
|
try {
|
|
|
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
|
|
|
//check memory and cpu usage and threads
|
|
@@ -165,6 +167,9 @@ public class FetchTaskThread implements Runnable{
|
|
|
List<String> taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum);
|
|
|
|
|
|
for(String taskQueueStr : taskQueueStrArr){
|
|
|
+
|
|
|
+ currentTaskQueueStr = taskQueueStr;
|
|
|
+
|
|
|
if (StringUtils.isEmpty(taskQueueStr)) {
|
|
|
continue;
|
|
|
}
|
|
@@ -184,7 +189,7 @@ public class FetchTaskThread implements Runnable{
|
|
|
// verify task instance is null
|
|
|
if (verifyTaskInstanceIsNull(taskInstance)) {
|
|
|
logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr);
|
|
|
- removeNodeFromTaskQueue(taskQueueStr);
|
|
|
+ processErrorTask(taskQueueStr);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@@ -192,13 +197,17 @@ public class FetchTaskThread implements Runnable{
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
|
|
|
- taskInstance.getProcessDefine().getUserId());
|
|
|
+ // if process definition is null ,process definition already deleted
|
|
|
+ int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
|
|
|
+
|
|
|
+ Tenant tenant = processDao.getTenantForProcess(
|
|
|
+ taskInstance.getProcessInstance().getTenantId(),
|
|
|
+ userId);
|
|
|
|
|
|
// verify tenant is null
|
|
|
if (verifyTenantIsNull(tenant)) {
|
|
|
logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);
|
|
|
- removeNodeFromTaskQueue(taskQueueStr);
|
|
|
+ processErrorTask(taskQueueStr);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@@ -232,6 +241,7 @@ public class FetchTaskThread implements Runnable{
|
|
|
}
|
|
|
|
|
|
}catch (Exception e){
|
|
|
+ processErrorTask(currentTaskQueueStr);
|
|
|
logger.error("fetch task thread failure" ,e);
|
|
|
}finally {
|
|
|
AbstractZKClient.releaseMutex(mutex);
|
|
@@ -239,6 +249,26 @@ public class FetchTaskThread implements Runnable{
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * process error task
|
|
|
+ *
|
|
|
+ * @param taskQueueStr task queue str
|
|
|
+ */
|
|
|
+ private void processErrorTask(String taskQueueStr){
|
|
|
+ // remove from zk
|
|
|
+ removeNodeFromTaskQueue(taskQueueStr);
|
|
|
+
|
|
|
+ if (taskInstance != null){
|
|
|
+ processDao.changeTaskState(ExecutionStatus.FAILURE,
|
|
|
+ taskInstance.getStartTime(),
|
|
|
+ taskInstance.getHost(),
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ taskInstId);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* remove node from task queue
|
|
|
*
|
|
@@ -269,8 +299,7 @@ public class FetchTaskThread implements Runnable{
|
|
|
*/
|
|
|
private boolean verifyTenantIsNull(Tenant tenant) {
|
|
|
if(tenant == null){
|
|
|
- logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}",
|
|
|
- taskInstance.getProcessDefine().getId(),
|
|
|
+ logger.error("tenant not exists,process instance id : {},task instance id : {}",
|
|
|
taskInstance.getProcessInstance().getId(),
|
|
|
taskInstance.getId());
|
|
|
return true;
|