Forráskód Böngészése

qianfan task result add judge update

qiaozhanwei 5 éve
szülő
commit
8b88cd0b3d

+ 39 - 1
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@@ -25,7 +25,12 @@ import cn.escheduler.common.model.TaskNode;
 import cn.escheduler.common.process.Property;
 import cn.escheduler.common.task.AbstractParameters;
 import cn.escheduler.common.task.TaskTimeoutParameter;
+import cn.escheduler.common.task.mr.MapreduceParameters;
+import cn.escheduler.common.task.procedure.ProcedureParameters;
+import cn.escheduler.common.task.python.PythonParameters;
 import cn.escheduler.common.task.shell.ShellParameters;
+import cn.escheduler.common.task.spark.SparkParameters;
+import cn.escheduler.common.task.sql.SqlParameters;
 import cn.escheduler.common.utils.*;
 import cn.escheduler.dao.ProcessDao;
 import cn.escheduler.dao.TaskRecordDao;
@@ -206,7 +211,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
                 if (TaskRecordDao.getTaskRecordFlag()
                         && TaskType.typeIsNormalTask(taskInstance.getTaskType())){
 
-                    AbstractParameters params = JSONUtils.parseObject(taskProps.getTaskParams(), AbstractParameters.class);
+                    AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
 
                     // replace placeholder
                     Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
@@ -279,6 +284,39 @@ public class TaskScheduleThread implements Callable<Boolean> {
     }
 
 
+    /**
+     * get current task parameter class
+     * @return
+     */
+    private Class getCurTaskParamsClass(){
+        Class paramsClass = null;
+        TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
+        switch (taskType){
+            case SHELL:
+                paramsClass = ShellParameters.class;
+                break;
+            case SQL:
+                paramsClass = SqlParameters.class;
+                break;
+            case PROCEDURE:
+                paramsClass = ProcedureParameters.class;
+                break;
+            case MR:
+                paramsClass = MapreduceParameters.class;
+                break;
+            case SPARK:
+                paramsClass = SparkParameters.class;
+                break;
+            case PYTHON:
+                paramsClass = PythonParameters.class;
+                break;
+            default:
+                logger.error("not support this task type: {}", taskType);
+                throw new IllegalArgumentException("not support this task type");
+        }
+        return paramsClass;
+    }
+
     /**
      *  kill task
      */