Преглед на файлове

Merge pull request #489 from qiaozhanwei/branch-1.0.2

qianfan task result add judge update
乔占卫 преди 5 години
родител
ревизия
3286f15e87
променени са 1 файла, в които са добавени 60 реда и са изтрити 14 реда
  1. 60 14
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

+ 60 - 14
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@@ -25,15 +25,19 @@ import cn.escheduler.common.model.TaskNode;
 import cn.escheduler.common.process.Property;
 import cn.escheduler.common.task.AbstractParameters;
 import cn.escheduler.common.task.TaskTimeoutParameter;
-import cn.escheduler.common.utils.CommonUtils;
-import cn.escheduler.common.utils.DateUtils;
-import cn.escheduler.common.utils.HadoopUtils;
-import cn.escheduler.common.utils.TaskParametersUtils;
+import cn.escheduler.common.task.mr.MapreduceParameters;
+import cn.escheduler.common.task.procedure.ProcedureParameters;
+import cn.escheduler.common.task.python.PythonParameters;
+import cn.escheduler.common.task.shell.ShellParameters;
+import cn.escheduler.common.task.spark.SparkParameters;
+import cn.escheduler.common.task.sql.SqlParameters;
+import cn.escheduler.common.utils.*;
 import cn.escheduler.dao.ProcessDao;
 import cn.escheduler.dao.TaskRecordDao;
 import cn.escheduler.dao.model.ProcessInstance;
 import cn.escheduler.dao.model.TaskInstance;
 import cn.escheduler.server.utils.LoggerUtils;
+import cn.escheduler.server.utils.ParamUtils;
 import cn.escheduler.server.worker.log.TaskLogger;
 import cn.escheduler.server.worker.task.AbstractTask;
 import cn.escheduler.server.worker.task.TaskManager;
@@ -144,6 +148,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
 
             TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class);
 
+
             List<String> projectRes = createProjectResFiles(taskNode);
 
             // copy hdfs file to local
@@ -205,17 +210,25 @@ public class TaskScheduleThread implements Callable<Boolean> {
                 // task recor flat : if true , start up qianfan
                 if (TaskRecordDao.getTaskRecordFlag()
                         && TaskType.typeIsNormalTask(taskInstance.getTaskType())){
-                    Date scheduleTime = processInstance.getScheduleTime();
-                    if(scheduleTime == null){
-                        scheduleTime = processInstance.getStartTime();
-                    }
 
-                    // process exec time : yyyyMMdd format
-                    String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD);
-                    TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate);
-                    logger.info("task record status : {}",taskRecordState);
-                    if (taskRecordState == TaskRecordStatus.FAILURE){
-                        status = ExecutionStatus.FAILURE;
+                    AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
+
+                    // replace placeholder
+                    Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
+                            taskProps.getDefinedParams(),
+                            params.getLocalParametersMap(),
+                            processInstance.getCmdTypeIfComplement(),
+                            processInstance.getScheduleTime());
+                    if (paramsMap != null && !paramsMap.isEmpty()
+                            && paramsMap.containsKey("v_proc_date")){
+                        String vProcDate = paramsMap.get("v_proc_date").getValue();
+                        if (!StringUtils.isEmpty(vProcDate)){
+                            TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
+                            logger.info("task record status : {}",taskRecordState);
+                            if (taskRecordState == TaskRecordStatus.FAILURE){
+                                status = ExecutionStatus.FAILURE;
+                            }
+                        }
                     }
                 }
 
@@ -271,6 +284,39 @@ public class TaskScheduleThread implements Callable<Boolean> {
     }
 
 
+    /**
+     * get current task parameter class
+     * @return
+     */
+    private Class getCurTaskParamsClass(){
+        Class paramsClass = null;
+        TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
+        switch (taskType){
+            case SHELL:
+                paramsClass = ShellParameters.class;
+                break;
+            case SQL:
+                paramsClass = SqlParameters.class;
+                break;
+            case PROCEDURE:
+                paramsClass = ProcedureParameters.class;
+                break;
+            case MR:
+                paramsClass = MapreduceParameters.class;
+                break;
+            case SPARK:
+                paramsClass = SparkParameters.class;
+                break;
+            case PYTHON:
+                paramsClass = PythonParameters.class;
+                break;
+            default:
+                logger.error("not support this task type: {}", taskType);
+                throw new IllegalArgumentException("not support this task type");
+        }
+        return paramsClass;
+    }
+
     /**
      *  kill task
      */