|
@@ -14,6 +14,7 @@
|
|
|
* See the License for the specific language governing permissions and
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
+
|
|
|
package org.apache.dolphinscheduler.server.utils;
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.Constants;
|
|
@@ -22,19 +23,21 @@ import org.apache.dolphinscheduler.common.enums.DependentRelation;
|
|
|
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
|
|
import org.apache.dolphinscheduler.common.model.DateInterval;
|
|
|
import org.apache.dolphinscheduler.common.model.DependentItem;
|
|
|
-import org.apache.dolphinscheduler.common.model.TaskNode;
|
|
|
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
|
|
|
import org.apache.dolphinscheduler.common.utils.DependentUtils;
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
|
|
-import org.apache.dolphinscheduler.dao.utils.DagHelper;
|
|
|
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
|
|
import org.apache.dolphinscheduler.service.process.ProcessService;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import java.util.*;
|
|
|
-
|
|
|
/**
|
|
|
* dependent item execute
|
|
|
*/
|
|
@@ -74,7 +77,7 @@ public class DependentExecute {
|
|
|
* @param itemList item list
|
|
|
* @param relation relation
|
|
|
*/
|
|
|
- public DependentExecute(List<DependentItem> itemList, DependentRelation relation){
|
|
|
+ public DependentExecute(List<DependentItem> itemList, DependentRelation relation) {
|
|
|
this.dependItemList = itemList;
|
|
|
this.relation = relation;
|
|
|
}
|
|
@@ -85,9 +88,9 @@ public class DependentExecute {
|
|
|
* @param currentTime current time
|
|
|
* @return DependResult
|
|
|
*/
|
|
|
- private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){
|
|
|
+ private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime) {
|
|
|
List<DateInterval> dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
|
|
|
- return calculateResultForTasks(dependentItem, dateIntervals );
|
|
|
+ return calculateResultForTasks(dependentItem, dateIntervals);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -100,19 +103,18 @@ public class DependentExecute {
|
|
|
List<DateInterval> dateIntervals) {
|
|
|
|
|
|
DependResult result = DependResult.FAILED;
|
|
|
- for(DateInterval dateInterval : dateIntervals){
|
|
|
- ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
|
|
|
- dateInterval);
|
|
|
- if(processInstance == null){
|
|
|
+ for (DateInterval dateInterval : dateIntervals) {
|
|
|
+ ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), dateInterval);
|
|
|
+ if (processInstance == null) {
|
|
|
return DependResult.WAITING;
|
|
|
}
|
|
|
// need to check workflow for updates, so get all task and check the task state
|
|
|
- if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
|
|
|
+ if (dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)) {
|
|
|
result = dependResultByProcessInstance(processInstance);
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
result = getDependTaskResult(dependentItem.getDepTasks(),processInstance);
|
|
|
}
|
|
|
- if(result != DependResult.SUCCESS){
|
|
|
+ if (result != DependResult.SUCCESS) {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -123,11 +125,11 @@ public class DependentExecute {
|
|
|
* depend type = depend_all
|
|
|
* @return
|
|
|
*/
|
|
|
- private DependResult dependResultByProcessInstance(ProcessInstance processInstance){
|
|
|
- if(!processInstance.getState().typeIsFinished()){
|
|
|
+ private DependResult dependResultByProcessInstance(ProcessInstance processInstance) {
|
|
|
+ if (!processInstance.getState().typeIsFinished()) {
|
|
|
return DependResult.WAITING;
|
|
|
}
|
|
|
- if(processInstance.getState().typeIsSuccess()){
|
|
|
+ if (processInstance.getState().typeIsSuccess()) {
|
|
|
return DependResult.SUCCESS;
|
|
|
}
|
|
|
return DependResult.FAILED;
|
|
@@ -144,22 +146,22 @@ public class DependentExecute {
|
|
|
TaskInstance taskInstance = null;
|
|
|
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
|
|
|
|
|
|
- for(TaskInstance task : taskInstanceList){
|
|
|
- if(task.getName().equals(taskName)){
|
|
|
+ for (TaskInstance task : taskInstanceList) {
|
|
|
+ if (task.getName().equals(taskName)) {
|
|
|
taskInstance = task;
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if(taskInstance == null){
|
|
|
+ if (taskInstance == null) {
|
|
|
// cannot find task in the process instance
|
|
|
// maybe because process instance is running or failed.
|
|
|
- if(processInstance.getState().typeIsFinished()){
|
|
|
+ if (processInstance.getState().typeIsFinished()) {
|
|
|
result = DependResult.FAILED;
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
return DependResult.WAITING;
|
|
|
}
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
result = getDependResultByState(taskInstance.getState());
|
|
|
}
|
|
|
|
|
@@ -177,7 +179,7 @@ public class DependentExecute {
|
|
|
private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) {
|
|
|
|
|
|
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval.getStartTime(), dateInterval.getEndTime());
|
|
|
- if(runningProcess != null){
|
|
|
+ if (runningProcess != null) {
|
|
|
return runningProcess;
|
|
|
}
|
|
|
|
|
@@ -189,15 +191,14 @@ public class DependentExecute {
|
|
|
definitionId, dateInterval
|
|
|
);
|
|
|
|
|
|
- if(lastManualProcess ==null){
|
|
|
+ if (lastManualProcess == null) {
|
|
|
return lastSchedulerProcess;
|
|
|
}
|
|
|
- if(lastSchedulerProcess == null){
|
|
|
+ if (lastSchedulerProcess == null) {
|
|
|
return lastManualProcess;
|
|
|
}
|
|
|
|
|
|
- return (lastManualProcess.getEndTime().after(lastSchedulerProcess.getEndTime()))?
|
|
|
- lastManualProcess : lastSchedulerProcess;
|
|
|
+ return (lastManualProcess.getEndTime().after(lastSchedulerProcess.getEndTime())) ? lastManualProcess : lastSchedulerProcess;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -207,11 +208,11 @@ public class DependentExecute {
|
|
|
*/
|
|
|
private DependResult getDependResultByState(ExecutionStatus state) {
|
|
|
|
|
|
- if(!state.typeIsFinished()){
|
|
|
+ if (!state.typeIsFinished()) {
|
|
|
return DependResult.WAITING;
|
|
|
- }else if(state.typeIsSuccess()){
|
|
|
+ } else if (state.typeIsSuccess()) {
|
|
|
return DependResult.SUCCESS;
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
return DependResult.FAILED;
|
|
|
}
|
|
|
}
|
|
@@ -223,11 +224,11 @@ public class DependentExecute {
|
|
|
*/
|
|
|
private DependResult getDependResultByProcessStateWhenTaskNull(ExecutionStatus state) {
|
|
|
|
|
|
- if(state.typeIsRunning()
|
|
|
+ if (state.typeIsRunning()
|
|
|
|| state == ExecutionStatus.SUBMITTED_SUCCESS
|
|
|
- || state == ExecutionStatus.WAITTING_THREAD){
|
|
|
+ || state == ExecutionStatus.WAITTING_THREAD) {
|
|
|
return DependResult.WAITING;
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
return DependResult.FAILED;
|
|
|
}
|
|
|
}
|
|
@@ -237,8 +238,8 @@ public class DependentExecute {
|
|
|
* @param currentTime current time
|
|
|
* @return boolean
|
|
|
*/
|
|
|
- public boolean finish(Date currentTime){
|
|
|
- if(modelDependResult == DependResult.WAITING){
|
|
|
+ public boolean finish(Date currentTime) {
|
|
|
+ if (modelDependResult == DependResult.WAITING) {
|
|
|
modelDependResult = getModelDependResult(currentTime);
|
|
|
return false;
|
|
|
}
|
|
@@ -250,13 +251,13 @@ public class DependentExecute {
|
|
|
* @param currentTime current time
|
|
|
* @return DependResult
|
|
|
*/
|
|
|
- public DependResult getModelDependResult(Date currentTime){
|
|
|
+ public DependResult getModelDependResult(Date currentTime) {
|
|
|
|
|
|
List<DependResult> dependResultList = new ArrayList<>();
|
|
|
|
|
|
- for(DependentItem dependentItem : dependItemList){
|
|
|
+ for (DependentItem dependentItem : dependItemList) {
|
|
|
DependResult dependResult = getDependResultForItem(dependentItem, currentTime);
|
|
|
- if(dependResult != DependResult.WAITING){
|
|
|
+ if (dependResult != DependResult.WAITING) {
|
|
|
dependResultMap.put(dependentItem.getKey(), dependResult);
|
|
|
}
|
|
|
dependResultList.add(dependResult);
|
|
@@ -273,15 +274,15 @@ public class DependentExecute {
|
|
|
* @param currentTime current time
|
|
|
* @return DependResult
|
|
|
*/
|
|
|
- private DependResult getDependResultForItem(DependentItem item, Date currentTime){
|
|
|
+ private DependResult getDependResultForItem(DependentItem item, Date currentTime) {
|
|
|
String key = item.getKey();
|
|
|
- if(dependResultMap.containsKey(key)){
|
|
|
+ if (dependResultMap.containsKey(key)) {
|
|
|
return dependResultMap.get(key);
|
|
|
}
|
|
|
return getDependentResultForItem(item, currentTime);
|
|
|
}
|
|
|
|
|
|
- public Map<String, DependResult> getDependResultMap(){
|
|
|
+ public Map<String, DependResult> getDependResultMap() {
|
|
|
return dependResultMap;
|
|
|
}
|
|
|
|