Browse Source

worker code optimization

qiaozhanwei 5 years ago
parent
commit
67c28c76a0
31 changed files with 998 additions and 740 deletions
  1. 1 1
      escheduler-api/src/main/java/cn/escheduler/api/controller/LoginController.java
  2. 3 3
      escheduler-api/src/main/java/cn/escheduler/api/service/SessionService.java
  3. 15 10
      escheduler-common/src/main/java/cn/escheduler/common/Constants.java
  4. 29 0
      escheduler-common/src/main/java/cn/escheduler/common/enums/ServerEnum.java
  5. 38 1
      escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java
  6. 17 0
      escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java
  7. 13 0
      escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
  8. 20 1
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  9. 2 4
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapper.java
  10. 0 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapperProvider.java
  11. 39 34
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
  12. 8 0
      escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java
  13. 1 2
      escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java
  14. 1 1
      escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java
  15. 0 2
      escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java
  16. 108 93
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  17. 109 174
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
  18. 7 1
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java
  19. 112 1
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java
  20. 14 11
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java
  21. 27 14
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
  22. 10 6
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java
  23. 70 16
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java
  24. 0 2
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java
  25. 1 1
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java
  26. 4 3
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java
  27. 172 165
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java
  28. 23 49
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
  29. 17 15
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java
  30. 2 4
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java
  31. 135 124
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

+ 1 - 1
escheduler-api/src/main/java/cn/escheduler/api/controller/LoginController.java

@@ -116,7 +116,7 @@ public class LoginController extends BaseController {
             response.setStatus(HttpStatus.SC_OK);
             response.addCookie(new Cookie(Constants.SESSION_ID, sessionId));
 
-            logger.info("sessionId = " + sessionId);
+            logger.info("sessionId : {}" , sessionId);
             return success(LOGIN_SUCCESS.getMsg(), sessionId);
         } catch (Exception e) {
             logger.error(USER_LOGIN_FAILURE.getMsg(),e);

+ 3 - 3
escheduler-api/src/main/java/cn/escheduler/api/service/SessionService.java

@@ -68,7 +68,7 @@ public class SessionService extends BaseService{
     String ip = BaseController.getClientIpAddress(request);
     logger.info("get session: {}, ip: {}", sessionId, ip);
 
-    return sessionMapper.queryByIdAndIp(sessionId, ip);
+    return sessionMapper.queryByIdAndIp(sessionId);
   }
 
   /**
@@ -80,7 +80,7 @@ public class SessionService extends BaseService{
    */
   public String createSession(User user, String ip) {
     // logined
-    Session session = sessionMapper.queryByUserIdAndIp(user.getId(), ip);
+    Session session = sessionMapper.queryByUserIdAndIp(user.getId());
     Date now = new Date();
 
     /**
@@ -126,7 +126,7 @@ public class SessionService extends BaseService{
     /**
      * query session by user id and ip
      */
-    Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(), ip);
+    Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId());
     //delete session
     sessionMapper.deleteById(session.getId());
   }

+ 15 - 10
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@@ -119,10 +119,6 @@ public final class Constants {
      */
     public static final String ESCHEDULER_ENV_PATH = "escheduler.env.path";
 
-    /**
-     * escheduler.env.sh
-     */
-    public static final String ESCHEDULER_ENV_SH = ".escheduler_env.sh";
 
     /**
      * python home
@@ -220,9 +216,9 @@ public final class Constants {
     public static final String SEMICOLON = ";";
 
     /**
-     * DOT .
+     * EQUAL SIGN
      */
-    public static final String DOT = ".";
+    public static final String EQUAL_SIGN = "=";
 
     /**
      * ZOOKEEPER_SESSION_TIMEOUT
@@ -283,10 +279,6 @@ public final class Constants {
      */
     public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
 
-    /**
-     * date format of yyyyMMdd
-     */
-    public static final String YYYYMMDD = "yyyyMMdd";
 
     /**
      * date format of yyyyMMddHHmmss
@@ -489,6 +481,7 @@ public final class Constants {
     public static final String TASK_RECORD_PWD = "task.record.datasource.password";
 
     public static final String DEFAULT = "Default";
+    public static final String USER = "user";
     public static final String PASSWORD = "password";
     public static final String XXXXXX = "******";
 
@@ -499,6 +492,7 @@ public final class Constants {
     public static final String STATUS = "status";
 
 
+
     /**
      * command parameter keys
      */
@@ -866,6 +860,11 @@ public final class Constants {
      */
     public static final int PREVIEW_SCHEDULE_EXECUTE_COUNT = 5;
 
+    /**
+     * kerberos
+     */
+    public static final String KERBEROS = "kerberos";
+
     /**
      * java.security.krb5.conf
      */
@@ -901,4 +900,10 @@ public final class Constants {
      * loginUserFromKeytab path
      */
     public static final String LOGIN_USER_KEY_TAB_PATH = "login.user.keytab.path";
+
+
+    /**
+     * hive conf
+     */
+    public static final String HIVE_CONF = "hiveconf:";
 }

+ 29 - 0
escheduler-common/src/main/java/cn/escheduler/common/enums/ServerEnum.java

@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cn.escheduler.common.enums;
+
+/**
+ * cycle enums
+ */
+public enum ServerEnum {
+
+    /**
+     * master server , worker server
+     */
+    MASTER_SERVER,WORKER_SERVER
+
+}

+ 38 - 1
escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java

@@ -21,6 +21,8 @@ import cn.escheduler.common.utils.JSONUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static cn.escheduler.common.Constants.*;
+
 /**
  * produce datasource in this custom defined datasource factory.
  */
@@ -49,8 +51,43 @@ public class DataSourceFactory {
           return null;
       }
     } catch (Exception e) {
-      logger.error("Get datasource object error", e);
+      logger.error("get datasource object error", e);
       return null;
     }
   }
+
+  /**
+   * load class
+   * @param dbType
+   * @throws Exception
+   */
+  public static void loadClass(DbType dbType) throws Exception{
+    switch (dbType){
+      case MYSQL :
+        Class.forName(JDBC_MYSQL_CLASS_NAME);
+        break;
+      case POSTGRESQL :
+        Class.forName(JDBC_POSTGRESQL_CLASS_NAME);
+        break;
+      case HIVE :
+        Class.forName(JDBC_HIVE_CLASS_NAME);
+        break;
+      case SPARK :
+        Class.forName(JDBC_SPARK_CLASS_NAME);
+        break;
+      case CLICKHOUSE :
+        Class.forName(JDBC_CLICKHOUSE_CLASS_NAME);
+        break;
+      case ORACLE :
+        Class.forName(JDBC_ORACLE_CLASS_NAME);
+        break;
+      case SQLSERVER:
+        Class.forName(JDBC_SQLSERVER_CLASS_NAME);
+        break;
+      default:
+        logger.error("not support sql type: {},can't load class", dbType);
+        throw new IllegalArgumentException("not support sql type,can't load class");
+
+    }
+  }
 }

+ 17 - 0
escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java

@@ -19,6 +19,8 @@ package cn.escheduler.common.utils;
 import cn.escheduler.common.Constants;
 import cn.escheduler.common.enums.ResUploadType;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,4 +76,19 @@ public class CommonUtils {
     Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
     return resUploadType == ResUploadType.HDFS && kerberosStartupState;
   }
+
+  /**
+   * load kerberos configuration
+   * @throws Exception
+   */
+  public static void loadKerberosConf()throws Exception{
+    if (CommonUtils.getKerberosStartupState())  {
+      System.setProperty(JAVA_SECURITY_KRB5_CONF, getString(JAVA_SECURITY_KRB5_CONF_PATH));
+      Configuration configuration = new Configuration();
+      configuration.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS);
+      UserGroupInformation.setConfiguration(configuration);
+      UserGroupInformation.loginUserFromKeytab(getString(LOGIN_USER_KEY_TAB_USERNAME),
+              getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH));
+    }
+  }
 }

+ 13 - 0
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java

@@ -18,6 +18,7 @@ package cn.escheduler.common.zk;
 
 import cn.escheduler.common.Constants;
 import cn.escheduler.common.IStoppable;
+import cn.escheduler.common.enums.ServerEnum;
 import cn.escheduler.common.utils.DateUtils;
 import cn.escheduler.common.utils.OSUtils;
 import org.apache.commons.configuration.Configuration;
@@ -27,6 +28,7 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -415,6 +417,17 @@ public abstract class AbstractZKClient {
 		return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS);
 	}
 
+	/**
+	 * acquire zk lock
+	 * @param zkClient
+	 * @param zNodeLockPath
+	 * @throws Exception
+	 */
+	public InterProcessMutex acquireZkLock(CuratorFramework zkClient,String zNodeLockPath)throws Exception{
+		InterProcessMutex mutex = new InterProcessMutex(zkClient, zNodeLockPath);
+		mutex.acquire();
+		return mutex;
+	}
 
 	@Override
 	public String toString() {

+ 20 - 1
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@@ -1220,6 +1220,26 @@ public class ProcessDao extends AbstractBaseDao {
         return taskInstanceMapper.queryById(taskId);
     }
 
+
+    /**
+     * package task instance,associate processInstance and processDefine
+     * @param taskInstId
+     * @return
+     */
+    public TaskInstance getTaskInstanceRelationByTaskId(int taskInstId){
+        // get task instance
+        TaskInstance taskInstance = findTaskInstanceById(taskInstId);
+        // get process instance
+        ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+        // get process define
+        ProcessDefinition processDefine = findProcessDefineById(taskInstance.getProcessDefinitionId());
+
+        taskInstance.setProcessInstance(processInstance);
+        taskInstance.setProcessDefine(processDefine);
+        return taskInstance;
+    }
+
+
     /**
      * get id list by task state
      * @param instanceId
@@ -1324,7 +1344,6 @@ public class ProcessDao extends AbstractBaseDao {
                                 String executePath,
                                 String logPath,
                                 int taskInstId) {
-
         TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
         taskInstance.setState(state);
         taskInstance.setStartTime(startTime);

+ 2 - 4
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapper.java

@@ -75,7 +75,6 @@ public interface SessionMapper {
      * query by session id and ip
      *
      * @param sessionId
-     * @param ip
      * @return
      */
     @Results(value = {
@@ -85,13 +84,12 @@ public interface SessionMapper {
             @Result(property = "lastLoginTime", column = "last_login_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE)
     })
     @SelectProvider(type = SessionMapperProvider.class, method = "queryByIdAndIp")
-    Session queryByIdAndIp(@Param("sessionId") String sessionId, @Param("ip") String ip);
+    Session queryByIdAndIp(@Param("sessionId") String sessionId);
 
 
     /**
      * query by user id and ip
      * @param userId
-     * @param ip
      * @return
      */
     @Results(value = {
@@ -101,6 +99,6 @@ public interface SessionMapper {
             @Result(property = "lastLoginTime", column = "last_login_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE)
     })
     @SelectProvider(type = SessionMapperProvider.class, method = "queryByUserIdAndIp")
-    Session queryByUserIdAndIp(@Param("userId") int userId, @Param("ip") String ip);
+    Session queryByUserIdAndIp(@Param("userId") int userId);
 
 }

+ 0 - 2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapperProvider.java

@@ -114,7 +114,6 @@ public class SessionMapperProvider {
             FROM(TABLE_NAME);
 
             WHERE("`id` = #{sessionId}");
-            WHERE("`ip` = #{ip}");
         }}.toString();
     }
 
@@ -130,7 +129,6 @@ public class SessionMapperProvider {
             FROM(TABLE_NAME);
 
             WHERE("`user_id` = #{userId}");
-            WHERE("`ip` = #{ip}");
         }}.toString();
     }
 }

+ 39 - 34
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java

@@ -529,6 +529,39 @@ public class ProcessInstance {
         this.timeout = timeout;
     }
 
+
+    public void setTenantId(int tenantId) {
+        this.tenantId = tenantId;
+    }
+
+    public int getTenantId() {
+        return this.tenantId ;
+    }
+
+    public String getWorkerGroupName() {
+        return workerGroupName;
+    }
+
+    public void setWorkerGroupName(String workerGroupName) {
+        this.workerGroupName = workerGroupName;
+    }
+
+    public String getReceivers() {
+        return receivers;
+    }
+
+    public void setReceivers(String receivers) {
+        this.receivers = receivers;
+    }
+
+    public String getReceiversCc() {
+        return receiversCc;
+    }
+
+    public void setReceiversCc(String receiversCc) {
+        this.receiversCc = receiversCc;
+    }
+
     @Override
     public String toString() {
         return "ProcessInstance{" +
@@ -555,7 +588,6 @@ public class ProcessInstance {
                 ", processInstanceJson='" + processInstanceJson + '\'' +
                 ", executorId=" + executorId +
                 ", tenantCode='" + tenantCode + '\'' +
-                ", tenantId='" + tenantId + '\'' +
                 ", queue='" + queue + '\'' +
                 ", isSubProcess=" + isSubProcess +
                 ", locations='" + locations + '\'' +
@@ -563,40 +595,13 @@ public class ProcessInstance {
                 ", historyCmd='" + historyCmd + '\'' +
                 ", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' +
                 ", duration=" + duration +
-                ", timeout=" + timeout +
                 ", processInstancePriority=" + processInstancePriority +
+                ", workerGroupId=" + workerGroupId +
+                ", timeout=" + timeout +
+                ", tenantId=" + tenantId +
+                ", workerGroupName='" + workerGroupName + '\'' +
+                ", receivers='" + receivers + '\'' +
+                ", receiversCc='" + receiversCc + '\'' +
                 '}';
     }
-
-    public void setTenantId(int tenantId) {
-        this.tenantId = tenantId;
-    }
-
-    public int getTenantId() {
-        return this.tenantId ;
-    }
-
-    public String getWorkerGroupName() {
-        return workerGroupName;
-    }
-
-    public void setWorkerGroupName(String workerGroupName) {
-        this.workerGroupName = workerGroupName;
-    }
-
-    public String getReceivers() {
-        return receivers;
-    }
-
-    public void setReceivers(String receivers) {
-        this.receivers = receivers;
-    }
-
-    public String getReceiversCc() {
-        return receiversCc;
-    }
-
-    public void setReceiversCc(String receiversCc) {
-        this.receiversCc = receiversCc;
-    }
 }

+ 8 - 0
escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java

@@ -189,6 +189,14 @@ public class TaskInstance {
     private int workerGroupId;
 
 
+
+    public void  init(String host,Date startTime,String executePath){
+        this.host = host;
+        this.startTime = startTime;
+        this.executePath = executePath;
+    }
+
+
     public ProcessInstance getProcessInstance() {
         return processInstance;
     }

+ 1 - 2
escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java

@@ -314,8 +314,7 @@ public class ProcessUtils {
       }
 
     } catch (Exception e) {
-      logger.error("kill yarn job failed : " + e.getMessage(),e);
-//      throw new RuntimeException("kill yarn job fail");
+      logger.error("kill yarn job failure",e);
     }
   }
 }

+ 1 - 1
escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java

@@ -254,7 +254,7 @@ public class WorkerServer implements IStoppable {
                     try {
                         Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                     } catch (InterruptedException e) {
-                        logger.error("interrupted exception : " + e.getMessage(),e);
+                        logger.error("interrupted exception",e);
                     }
                     // if set is null , return
                     if (CollectionUtils.isNotEmpty(taskInfoSet)){

+ 0 - 2
escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java

@@ -26,8 +26,6 @@ import org.slf4j.LoggerFactory;
  */
 public class TaskLogAppender extends FileAppender<ILoggingEvent> {
 
-    private static final Logger logger = LoggerFactory.getLogger(TaskLogAppender.class);
-
     private String currentlyActiveFile;
 
     @Override

+ 108 - 93
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@@ -20,6 +20,7 @@ import cn.escheduler.common.Constants;
 import cn.escheduler.common.queue.ITaskQueue;
 import cn.escheduler.common.thread.Stopper;
 import cn.escheduler.common.thread.ThreadUtils;
+import cn.escheduler.common.utils.CollectionUtils;
 import cn.escheduler.common.utils.FileUtils;
 import cn.escheduler.common.utils.OSUtils;
 import cn.escheduler.dao.ProcessDao;
@@ -27,7 +28,6 @@ import cn.escheduler.dao.model.*;
 import cn.escheduler.server.zk.ZKWorkerClient;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,8 +74,20 @@ public class FetchTaskThread implements Runnable{
      */
     private int workerExecNums;
 
+    /**
+     * conf
+     */
     private Configuration conf;
 
+    /**
+     *  task instance
+     */
+    private TaskInstance taskInstance;
+
+    /**
+     * task instance id
+     */
+    Integer taskInstId;
 
     public FetchTaskThread(int taskNum, ZKWorkerClient zkWorkerClient,
                            ProcessDao processDao, Configuration conf,
@@ -124,126 +136,101 @@ public class FetchTaskThread implements Runnable{
 
     @Override
     public void run() {
-
         while (Stopper.isRunning()){
             InterProcessMutex mutex = null;
             try {
                 ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
-
                 //check memory and cpu usage and threads
-                if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) {
-
-                    //whether have tasks, if no tasks , no need lock  //get all tasks
-                    List<String> tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
-                    if(tasksQueueList.size() > 0){
-                        // creating distributed locks, lock path /escheduler/lock/worker
-                        String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
-                        mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
-                        mutex.acquire();
+                boolean runCheckFlag = OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor);
 
-                        // task instance id str
-                        List<String> taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
-
-                        for(String taskQueueStr : taskQueueStrArr){
-                            if (StringUtils.isNotBlank(taskQueueStr )) {
-
-                                if (!checkThreadCount(poolExecutor)) {
-                                    break;
-                                }
-
-                                String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
-                                String taskInstIdStr = taskStringArray[3];
-                                Date now = new Date();
-                                Integer taskId = Integer.parseInt(taskInstIdStr);
-
-                                // find task instance by task id
-                                TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
-
-                                logger.info("worker fetch taskId : {} from queue ", taskId);
-
-                                int retryTimes = 30;
-                                // mainly to wait for the master insert task to succeed
-                                while (taskInstance == null && retryTimes > 0) {
-                                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-                                    taskInstance = processDao.findTaskInstanceById(taskId);
-                                    retryTimes--;
-                                }
-
-                                if (taskInstance == null ) {
-                                    logger.error("task instance is null. task id : {} ", taskId);
-                                    continue;
-                                }
-
-                                if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
-                                    continue;
-                                }
-                                taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
-                                logger.info("remove task:{} from queue", taskQueueStr);
+                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
 
-                                // set execute task worker host
-                                taskInstance.setHost(OSUtils.getHost());
-                                taskInstance.setStartTime(now);
+                if(!runCheckFlag) {
+                    continue;
+                }
 
+                //whether have tasks, if no tasks , no need lock  //get all tasks
+                List<String> tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
+                if (CollectionUtils.isEmpty(tasksQueueList)){
+                    continue;
+                }
+                // creating distributed locks, lock path /escheduler/lock/worker
+                mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(),
+                        zkWorkerClient.getWorkerLockPath());
 
-                                // get process instance
-                                ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
 
-                                // get process define
-                                ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
+                // task instance id str
+                List<String> taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
 
+                for(String taskQueueStr : taskQueueStrArr){
+                    if (StringUtils.isEmpty(taskQueueStr)) {
+                        continue;
+                    }
 
-                                taskInstance.setProcessInstance(processInstance);
-                                taskInstance.setProcessDefine(processDefine);
+                    if (!checkThreadCount(poolExecutor)) {
+                        break;
+                    }
 
+                    // get task instance id
+                    taskInstId = Integer.parseInt(taskQueueStr.split(Constants.UNDERLINE)[3]);
+
+                    // get task instance relation
+                    TaskInstance taskInstance = processDao.getTaskInstanceRelationByTaskId(taskInstId);
+
+                    Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
+                            taskInstance.getProcessDefine().getUserId());
+                    if(tenant == null){
+                        logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}",
+                                taskInstance.getProcessDefine().getId(),
+                                taskInstance.getProcessInstance().getId(),
+                                taskInstance.getId());
+                        taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
+                        continue;
+                    }
 
-                                // get local execute path
-                                String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
-                                        processDefine.getId(),
-                                        processInstance.getId(),
-                                        taskInstance.getId());
-                                logger.info("task instance  local execute path : {} ", execLocalPath);
+                    logger.info("worker fetch taskId : {} from queue ", taskInstId);
 
+                    // mainly to wait for the master insert task to succeed
+                    waitForMasterEnterQueue();
 
-                                // set task execute path
-                                taskInstance.setExecutePath(execLocalPath);
+                    if (taskInstance == null ) {
+                        logger.error("task instance is null. task id : {} ", taskInstId);
+                        taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
+                        continue;
+                    }
 
-                            Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
-                                    processDefine.getUserId());
-                            if(tenant == null){
-                                logger.error("cannot find suitable tenant for the task:{}, process instance tenant:{}, process definition tenant:{}",
-                                        taskInstance.getName(),processInstance.getTenantId(), processDefine.getTenantId());
-                                continue;
-                            }
+                    if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
+                        continue;
+                    }
 
-                            // check and create Linux users
-                            FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
-                                    tenant.getTenantCode(), logger);
+                    // get local execute path
+                    logger.info("task instance  local execute path : {} ", getExecLocalPath());
 
-                                logger.info("task : {} ready to submit to task scheduler thread",taskId);
-                                // submit task
-                                workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
+                    // init task
+                    taskInstance.init(OSUtils.getHost(),
+                            new Date(),
+                            getExecLocalPath());
 
-                            }
-                        }
+                    // check and create Linux users
+                    FileUtils.createWorkDirAndUserIfAbsent(getExecLocalPath(),
+                            tenant.getTenantCode(), logger);
 
-                    }
+                    logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
+                    // submit task
+                    workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
 
+                    // remove node from zk
+                    taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
                 }
 
-                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-
             }catch (Exception e){
-                logger.error("fetch task thread exception : " + e.getMessage(),e);
+                logger.error("fetch task thread failure" ,e);
             }finally {
                 if (mutex != null){
                     try {
                         mutex.release();
                     } catch (Exception e) {
-                        if(e.getMessage().equals("instance must be started before calling this method")){
-                            logger.warn("fetch task lock release");
-                        }else{
-                            logger.error("fetch task lock release failed : " + e.getMessage(),e);
-                        }
+                        logger.error("fetch task lock release failure ",e);
                     }
                 }
             }
@@ -251,16 +238,44 @@ public class FetchTaskThread implements Runnable{
     }
 
     /**
-     *
+     * get execute local path
+     * @return
+     */
+    private String getExecLocalPath(){
+        return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+                taskInstance.getProcessDefine().getId(),
+                taskInstance.getProcessDefine().getId(),
+                taskInstance.getId());
+    }
+    /**
+     *  check
      * @param poolExecutor
      * @return
      */
     private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
         int activeCount = poolExecutor.getActiveCount();
         if (activeCount >= workerExecNums) {
-            logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS);
+            logger.info("thread insufficient , activeCount : {} , " +
+                            "workerExecNums : {}, will sleep : {} millis for thread resource",
+                    activeCount,
+                    workerExecNums,
+                    Constants.SLEEP_TIME_MILLIS);
             return false;
         }
         return true;
     }
+
+    /**
+     *  mainly to wait for the master insert task to succeed
+     * @throws Exception
+     */
+    private void waitForMasterEnterQueue()throws Exception{
+        int retryTimes = 30;
+
+        while (taskInstance == null && retryTimes > 0) {
+            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+            taskInstance = processDao.findTaskInstanceById(taskInstId);
+            retryTimes--;
+        }
+    }
 }

+ 109 - 174
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@@ -59,13 +59,16 @@ import java.util.stream.Collectors;
 /**
  *  task scheduler thread
  */
-public class TaskScheduleThread implements Callable<Boolean> {
+public class TaskScheduleThread implements Runnable {
 
     /**
      * logger
      */
     private final Logger logger = LoggerFactory.getLogger(TaskScheduleThread.class);
 
+    /**
+     * task prefix
+     */
     private static final String TASK_PREFIX = "TASK";
 
     /**
@@ -79,7 +82,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
     private final ProcessDao processDao;
 
     /**
-     *  execute task info
+     *  abstract task
      */
     private AbstractTask task;
 
@@ -89,115 +92,55 @@ public class TaskScheduleThread implements Callable<Boolean> {
     }
 
     @Override
-    public Boolean call() throws Exception {
+    public void run() {
 
-        // get task type
-        String taskType = taskInstance.getTaskType();
-        // set task state
-        taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
-
-        // update task state
-        if(taskType.equals(TaskType.SQL.name())  || taskType.equals(TaskType.PROCEDURE.name())){
-            processDao.changeTaskState(taskInstance.getState(),
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    null,
-                    System.getProperty("user.dir") + "/logs/" +
-                            taskInstance.getProcessDefinitionId() +"/" +
-                            taskInstance.getProcessInstanceId() +"/" +
-                            taskInstance.getId() + ".log",
-                    taskInstance.getId());
-        }else{
-            processDao.changeTaskState(taskInstance.getState(),
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    taskInstance.getExecutePath(),
-                    System.getProperty("user.dir") + "/logs/" +
-                            taskInstance.getProcessDefinitionId() +"/" +
-                            taskInstance.getProcessInstanceId() +"/" +
-                            taskInstance.getId() + ".log",
-                    taskInstance.getId());
-        }
-
-        ExecutionStatus status = ExecutionStatus.SUCCESS;
+        // update task state is running according to task type
+        updateTaskState(taskInstance.getTaskType());
 
         try {
+            logger.info("script path : {}", taskInstance.getExecutePath());
+            // task node
+            TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
 
-
-            // custom param str
-            String customParamStr = taskInstance.getProcessInstance().getGlobalParams();
-
-
-            Map<String,String> allParamMap = new HashMap<>();
-
-
-            if (customParamStr != null) {
-                List<Property> customParamMap = JSONObject.parseArray(customParamStr, Property.class);
-
-                Map<String,String> userDefinedParamMap = customParamMap.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
-
-                allParamMap.putAll(userDefinedParamMap);
-            }
-
-            logger.info("script path : {}",taskInstance.getExecutePath());
-
-            TaskProps taskProps = new TaskProps();
-
-            taskProps.setTaskDir(taskInstance.getExecutePath());
-
-            String taskJson = taskInstance.getTaskJson();
-
-
-            TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class);
-
-
-            List<String> projectRes = createProjectResFiles(taskNode);
-
-            // copy hdfs file to local
+            // copy hdfs/minio file to local
             copyHdfsToLocal(processDao,
                     taskInstance.getExecutePath(),
-                    projectRes,
+                    createProjectResFiles(taskNode),
                     logger);
 
-            // set task params
-            taskProps.setTaskParams(taskNode.getParams());
-            // set tenant code , execute task linux user
-
-            ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
-
-            taskProps.setScheduleTime(processInstance.getScheduleTime());
-            taskProps.setNodeName(taskInstance.getName());
-            taskProps.setTaskInstId(taskInstance.getId());
-            taskProps.setEnvFile(CommonUtils.getSystemEnvPath());
-
-            ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
+            // get process instance according to tak instance
+            ProcessInstance processInstance = taskInstance.getProcessInstance();
+            // get process define according to tak instance
+            ProcessDefinition processDefine = taskInstance.getProcessDefine();
 
+            // get tenant info
             Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
-                    processDefine.getUserId());
+                                                    processDefine.getUserId());
 
             if(tenant == null){
-                processInstance.setTenantCode(tenant.getTenantCode());
-                logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}",
-                        processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId()
-                );
-                status = ExecutionStatus.FAILURE;
+                logger.error("cannot find the tenant, process definition id:{}, user id:{}",
+                        processDefine.getId(),
+                        processDefine.getUserId());
+                task.setExitStatusCode(Constants.EXIT_CODE_FAILURE);
             }else{
-                taskProps.setTenantCode(tenant.getTenantCode());
-                String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId());
-                // set queue
-                if (StringUtils.isEmpty(queue)){
-                    taskProps.setQueue(taskInstance.getProcessInstance().getQueue());
-                }else {
-                    taskProps.setQueue(tenant.getQueueName());
-                }
-                taskProps.setTaskStartTime(taskInstance.getStartTime());
-                taskProps.setDefinedParams(allParamMap);
 
+                // set task props
+                TaskProps taskProps = new TaskProps(taskNode.getParams(),
+                        taskInstance.getExecutePath(),
+                        processInstance.getScheduleTime(),
+                        taskInstance.getName(),
+                        taskInstance.getTaskType(),
+                        taskInstance.getId(),
+                        CommonUtils.getSystemEnvPath(),
+                        tenant.getTenantCode(),
+                        tenant.getQueueName(),
+                        taskInstance.getStartTime(),
+                        getGlobalParamsMap(),
+                        taskInstance.getDependency(),
+                        processInstance.getCmdTypeIfComplement());
                 // set task timeout
                 setTaskTimeout(taskProps, taskNode);
 
-                taskProps.setDependence(taskInstance.getDependency());
-
                 taskProps.setTaskAppId(String.format("%s_%s_%s",
                         taskInstance.getProcessDefine().getId(),
                         taskInstance.getProcessInstance().getId(),
@@ -209,72 +152,98 @@ public class TaskScheduleThread implements Callable<Boolean> {
                         taskInstance.getProcessInstance().getId(),
                         taskInstance.getId()));
 
-                task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
+                task = TaskManager.newTask(taskInstance.getTaskType(),
+                        taskProps,
+                        taskLogger);
 
-                // job init
+                // task init
                 task.init();
 
-                // job handle
+                // task handle
                 task.handle();
-                logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode());
-
-                if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
-                    status = ExecutionStatus.SUCCESS;
-                    // task recor flat : if true , start up qianfan
-                    if (TaskRecordDao.getTaskRecordFlag()
-                            && TaskType.typeIsNormalTask(taskInstance.getTaskType())){
-
-                        AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
-
-                        // replace placeholder
-                        Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
-                                taskProps.getDefinedParams(),
-                                params.getLocalParametersMap(),
-                                processInstance.getCmdTypeIfComplement(),
-                                processInstance.getScheduleTime());
-                        if (paramsMap != null && !paramsMap.isEmpty()
-                                && paramsMap.containsKey("v_proc_date")){
-                            String vProcDate = paramsMap.get("v_proc_date").getValue();
-                            if (!StringUtils.isEmpty(vProcDate)){
-                                TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
-                                logger.info("task record status : {}",taskRecordState);
-                                if (taskRecordState == TaskRecordStatus.FAILURE){
-                                    status = ExecutionStatus.FAILURE;
-                                }
-                            }
-                        }
-                    }
 
-                }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
-                    status = ExecutionStatus.KILL;
-                }else {
-                    status = ExecutionStatus.FAILURE;
-                }
+                // task result process
+                task.after();
             }
         }catch (Exception e){
-            logger.error("task escheduler failure : ", e);
-            status = ExecutionStatus.FAILURE ;
-            logger.error(String.format("task process exception, process id : %s , task : %s",
-                    taskInstance.getProcessInstanceId(),
-                    taskInstance.getName()),e);
+            logger.error("task scheduler failure", e);
+            task.setExitStatusCode(Constants.EXIT_CODE_FAILURE);
             kill();
         }
+
+        logger.info("task instance id : {},task final status : {}",
+                taskInstance.getId(),
+                task.getExitStatus());
         // update task instance state
-        processDao.changeTaskState(status,
+        processDao.changeTaskState(task.getExitStatus(),
                 new Date(),
                 taskInstance.getId());
-        return task.getExitStatusCode() > Constants.EXIT_CODE_SUCCESS;
     }
 
     /**
-     * set task time out
+     * get global paras map
+     * @return
+     */
+    private Map<String, String> getGlobalParamsMap() {
+        Map<String,String> globalParamsMap = new HashMap<>(16);
+
+        // global params string
+        String globalParamsStr = taskInstance.getProcessInstance().getGlobalParams();
+
+        if (globalParamsStr != null) {
+            List<Property> globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class);
+            globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));
+        }
+        return globalParamsMap;
+    }
+
+    /**
+     *  update task state according to task type
+     * @param taskType
+     */
+    private void updateTaskState(String taskType) {
+        // update task status is running
+        if(taskType.equals(TaskType.SQL.name())  ||
+                taskType.equals(TaskType.PROCEDURE.name())){
+            processDao.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
+                    taskInstance.getStartTime(),
+                    taskInstance.getHost(),
+                    null,
+                    getTaskLogPath(),
+                    taskInstance.getId());
+        }else{
+            processDao.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
+                    taskInstance.getStartTime(),
+                    taskInstance.getHost(),
+                    taskInstance.getExecutePath(),
+                    getTaskLogPath(),
+                    taskInstance.getId());
+        }
+    }
+
+    /**
+     *  get task log path
+     * @return
+     */
+    private String getTaskLogPath() {
+        return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
+                "logs" +  Constants.SINGLE_SLASH +
+                taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH  +
+                taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH  +
+                taskInstance.getId() + ".log";
+    }
+
+    /**
+     * set task timeout
      * @param taskProps
      * @param taskNode
      */
     private void setTaskTimeout(TaskProps taskProps, TaskNode taskNode) {
+        // the default timeout is the maximum value of the integer
         taskProps.setTaskTimeout(Integer.MAX_VALUE);
         TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
         if (taskTimeoutParameter.getEnable()){
+            // get timeout strategy
             taskProps.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy());
             switch (taskTimeoutParameter.getStrategy()){
                 case WARN:
@@ -298,38 +267,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
     }
 
 
-    /**
-     * get current task parameter class
-     * @return
-     */
-    private Class getCurTaskParamsClass(){
-        Class paramsClass = null;
-        TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
-        switch (taskType){
-            case SHELL:
-                paramsClass = ShellParameters.class;
-                break;
-            case SQL:
-                paramsClass = SqlParameters.class;
-                break;
-            case PROCEDURE:
-                paramsClass = ProcedureParameters.class;
-                break;
-            case MR:
-                paramsClass = MapreduceParameters.class;
-                break;
-            case SPARK:
-                paramsClass = SparkParameters.class;
-                break;
-            case PYTHON:
-                paramsClass = PythonParameters.class;
-                break;
-            default:
-                logger.error("not support this task type: {}", taskType);
-                throw new IllegalArgumentException("not support this task type");
-        }
-        return paramsClass;
-    }
+
 
     /**
      *  kill task
@@ -376,9 +314,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
             File resFile = new File(execLocalPath, res);
             if (!resFile.exists()) {
                 try {
-                    /**
-                     * query the tenant code of the resource according to the name of the resource
-                     */
+                    // query the tenant code of the resource according to the name of the resource
                     String tentnCode = processDao.queryTenantCodeByResName(res);
                     String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode,res);
 
@@ -388,7 +324,6 @@ public class TaskScheduleThread implements Callable<Boolean> {
                     logger.error(e.getMessage(),e);
                     throw new RuntimeException(e.getMessage());
                 }
-
             } else {
                 logger.info("file : {} exists ", resFile.getName());
             }

+ 7 - 1
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java

@@ -67,6 +67,11 @@ public abstract class AbstractCommandExecutor {
      */
     protected final String taskAppId;
 
+    /**
+     *  task appId
+     */
+    protected final int taskInstId;
+
     /**
      *  tenant code , execute task linux user
      */
@@ -99,11 +104,12 @@ public abstract class AbstractCommandExecutor {
 
 
     public AbstractCommandExecutor(Consumer<List<String>> logHandler,
-                                   String taskDir, String taskAppId, String tenantCode, String envFile,
+                                   String taskDir, String taskAppId,int taskInstId,String tenantCode, String envFile,
                                    Date startTime, int timeout, Logger logger){
         this.logHandler = logHandler;
         this.taskDir = taskDir;
         this.taskAppId = taskAppId;
+        this.taskInstId = taskInstId;
         this.tenantCode = tenantCode;
         this.envFile = envFile;
         this.startTime = startTime;

+ 112 - 1
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java

@@ -16,10 +16,26 @@
  */
 package cn.escheduler.server.worker.task;
 
+import cn.escheduler.common.Constants;
+import cn.escheduler.common.enums.ExecutionStatus;
+import cn.escheduler.common.enums.TaskRecordStatus;
+import cn.escheduler.common.enums.TaskType;
+import cn.escheduler.common.process.Property;
 import cn.escheduler.common.task.AbstractParameters;
+import cn.escheduler.common.task.mr.MapreduceParameters;
+import cn.escheduler.common.task.procedure.ProcedureParameters;
+import cn.escheduler.common.task.python.PythonParameters;
+import cn.escheduler.common.task.shell.ShellParameters;
+import cn.escheduler.common.task.spark.SparkParameters;
+import cn.escheduler.common.task.sql.SqlParameters;
+import cn.escheduler.common.utils.JSONUtils;
+import cn.escheduler.dao.TaskRecordDao;
+import cn.escheduler.server.utils.ParamUtils;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  *  executive task
@@ -70,7 +86,7 @@ public abstract class AbstractTask {
 
 
     public void cancelApplication(boolean status) throws Exception {
-        cancel = true;
+        this.cancel = status;
     }
 
     /**
@@ -89,6 +105,9 @@ public abstract class AbstractTask {
         return exitStatusCode;
     }
 
+    public void setExitStatusCode(int exitStatusCode) {
+        this.exitStatusCode = exitStatusCode;
+    }
 
     /**
      * get task parameters
@@ -96,4 +115,96 @@ public abstract class AbstractTask {
     public abstract AbstractParameters getParameters();
 
 
+    /**
+     * result processing
+     */
+    public void after(){
+        if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
+            // task recor flat : if true , start up qianfan
+            if (TaskRecordDao.getTaskRecordFlag()
+                    && TaskType.typeIsNormalTask(taskProps.getTaskType())){
+                AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
+
+                // replace placeholder
+                Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
+                        taskProps.getDefinedParams(),
+                        params.getLocalParametersMap(),
+                        taskProps.getCmdTypeIfComplement(),
+                        taskProps.getScheduleTime());
+                if (paramsMap != null && !paramsMap.isEmpty()
+                        && paramsMap.containsKey("v_proc_date")){
+                    String vProcDate = paramsMap.get("v_proc_date").getValue();
+                    if (!StringUtils.isEmpty(vProcDate)){
+                        TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskProps.getNodeName(), vProcDate);
+                        logger.info("task record status : {}",taskRecordState);
+                        if (taskRecordState == TaskRecordStatus.FAILURE){
+                            setExitStatusCode(Constants.EXIT_CODE_FAILURE);
+                        }
+                    }
+                }
+            }
+
+        }else if (getExitStatusCode() == Constants.EXIT_CODE_KILL){
+            setExitStatusCode(Constants.EXIT_CODE_KILL);
+        }else {
+            setExitStatusCode(Constants.EXIT_CODE_FAILURE);
+        }
+    }
+
+
+
+
+    /**
+     * get current task parameter class
+     * @return
+     */
+    private Class getCurTaskParamsClass(){
+        Class paramsClass = null;
+        // get task type
+        TaskType taskType = TaskType.valueOf(taskProps.getTaskType());
+        switch (taskType){
+            case SHELL:
+                paramsClass = ShellParameters.class;
+                break;
+            case SQL:
+                paramsClass = SqlParameters.class;
+                break;
+            case PROCEDURE:
+                paramsClass = ProcedureParameters.class;
+                break;
+            case MR:
+                paramsClass = MapreduceParameters.class;
+                break;
+            case SPARK:
+                paramsClass = SparkParameters.class;
+                break;
+            case PYTHON:
+                paramsClass = PythonParameters.class;
+                break;
+            default:
+                logger.error("not support this task type: {}", taskType);
+                throw new IllegalArgumentException("not support this task type");
+        }
+        return paramsClass;
+    }
+
+    /**
+     *  get exit status according to exitCode
+     * @return
+     */
+    public ExecutionStatus getExitStatus(){
+        ExecutionStatus status;
+        switch (getExitStatusCode()){
+            case Constants.EXIT_CODE_SUCCESS:
+                status = ExecutionStatus.SUCCESS;
+                break;
+            case Constants.EXIT_CODE_KILL:
+                status = ExecutionStatus.KILL;
+                break;
+            default:
+                status = ExecutionStatus.FAILURE;
+                break;
+        }
+        return status;
+    }
 }

+ 14 - 11
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java

@@ -38,7 +38,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
   /**
    *  process task
    */
-  private ShellCommandExecutor processTask;
+  private ShellCommandExecutor shellCommandExecutor;
 
   /**
    *  process database access
@@ -53,21 +53,25 @@ public abstract class AbstractYarnTask extends AbstractTask {
   public AbstractYarnTask(TaskProps taskProps, Logger logger) {
     super(taskProps, logger);
     this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
-    // find process instance by taskId
     this.processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
-    this.processTask = new ShellCommandExecutor(this::logHandle,
-            taskProps.getTaskDir(), taskProps.getTaskAppId(),
-            taskProps.getTenantCode(), taskProps.getEnvFile(), taskProps.getTaskStartTime(),
-            taskProps.getTaskTimeout(), logger);
+    this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
+            taskProps.getTaskDir(),
+            taskProps.getTaskAppId(),
+            taskProps.getTaskInstId(),
+            taskProps.getTenantCode(),
+            taskProps.getEnvFile(),
+            taskProps.getTaskStartTime(),
+            taskProps.getTaskTimeout(),
+            logger);
   }
 
   @Override
   public void handle() throws Exception {
     try {
       // construct process
-      exitStatusCode = processTask.run(buildCommand(), processDao);
+      exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);
     } catch (Exception e) {
-      logger.error("yarn process failed : " + e.getMessage(), e);
+      logger.error("yarn process failure", e);
       exitStatusCode = -1;
     }
   }
@@ -76,9 +80,8 @@ public abstract class AbstractYarnTask extends AbstractTask {
   public void cancelApplication(boolean status) throws Exception {
     cancel = true;
     // cancel process
-    processTask.cancelApplication();
-    int taskInstId = taskProps.getTaskInstId();
-    TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
+    shellCommandExecutor.cancelApplication();
+    TaskInstance taskInstance = processDao.findTaskInstanceById(taskProps.getTaskInstId());
     if (status && taskInstance != null){
       ProcessUtils.killYarnJob(taskInstance);
     }

+ 27 - 14
escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java

@@ -18,7 +18,6 @@ package cn.escheduler.server.worker.task;
 
 import cn.escheduler.common.Constants;
 import cn.escheduler.common.utils.FileUtils;
-import cn.escheduler.common.utils.PropertyUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,9 +42,15 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
 
 
     public PythonCommandExecutor(Consumer<List<String>> logHandler,
-                                 String taskDir, String taskAppId, String tenantCode, String envFile,
-                                 Date startTime, int timeout, Logger logger) {
-        super(logHandler,taskDir,taskAppId, tenantCode, envFile, startTime, timeout, logger);
+                                 String taskDir,
+                                 String taskAppId,
+                                 int taskInstId,
+                                 String tenantCode,
+                                 String envFile,
+                                 Date startTime,
+                                 int timeout,
+                                 Logger logger) {
+        super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger);
     }
 
 
@@ -67,7 +72,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
      */
     @Override
     protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
-        logger.info("tenant :{}, work dir:{}", tenantCode, taskDir);
+        logger.info("tenantCode :{}, task dir:{}", tenantCode, taskDir);
 
         if (!Files.exists(Paths.get(commandFile))) {
             logger.info("generate command file:{}", commandFile);
@@ -80,16 +85,15 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
             logger.info(sb.toString());
 
             // write data to file
-            FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
+            FileUtils.writeStringToFile(new File(commandFile),
+                    sb.toString(),
+                    StandardCharsets.UTF_8);
         }
     }
 
     @Override
     protected String commandType() {
-
-        String envPath = PropertyUtils.getString(Constants.ESCHEDULER_ENV_PATH);
-
-        String pythonHome = getPythonHome(envPath);
+        String pythonHome = getPythonHome(envFile);
         if (StringUtils.isEmpty(pythonHome)){
             return PYTHON;
         }
@@ -108,16 +112,25 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
 
 
     /**
-     *  get python home
+     *  get the absolute path of the Python command
+     *  note :
+     *  common.properties
+     *  PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself
+     *
+     *  for example :
+     *  your PYTHON_HOM is /opt/python3.7/
+     *  you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties
+     *  escheduler.env.path file.
+     *
      * @param envPath
      * @return
      */
     private static String getPythonHome(String envPath){
         BufferedReader br = null;
-        String line = null;
         StringBuilder sb = new StringBuilder();
         try {
             br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));
+            String line;
             while ((line = br.readLine()) != null){
                 if (line.contains(Constants.PYTHON_HOME)){
                     sb.append(line);
@@ -128,13 +141,13 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
             if (org.apache.commons.lang.StringUtils.isEmpty(result)){
                 return null;
             }
-            String[] arrs = result.split("=");
+            String[] arrs = result.split(Constants.EQUAL_SIGN);
             if (arrs.length == 2){
                 return arrs[1];
             }
 
         }catch (IOException e){
-            logger.error("read file failed : " + e.getMessage(),e);
+            logger.error("read file failure",e);
         }finally {
             try {
                 if (br != null){

+ 10 - 6
escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java

@@ -29,9 +29,7 @@ import java.util.List;
 import java.util.function.Consumer;
 
 /**
- *  command executor
- *
- *  进程,真正在worker服务器上执行的任务
+ *  shell command executor
  */
 public class ShellCommandExecutor extends AbstractCommandExecutor {
 
@@ -39,9 +37,15 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
 
 
     public ShellCommandExecutor(Consumer<List<String>> logHandler,
-                                String taskDir, String taskAppId, String tenantCode, String envFile,
-                                Date startTime, int timeout, Logger logger) {
-        super(logHandler,taskDir,taskAppId, tenantCode, envFile, startTime, timeout, logger);
+                                String taskDir,
+                                String taskAppId,
+                                int taskInstId,
+                                String tenantCode,
+                                String envFile,
+                                Date startTime,
+                                int timeout,
+                                Logger logger) {
+        super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger);
     }
 
 

+ 70 - 16
escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java

@@ -16,6 +16,7 @@
  */
 package cn.escheduler.server.worker.task;
 
+import cn.escheduler.common.enums.CommandType;
 import cn.escheduler.common.enums.DataType;
 import cn.escheduler.common.enums.Direct;
 import cn.escheduler.common.enums.TaskTimeoutStrategy;
@@ -46,6 +47,8 @@ public class TaskProps {
    **/
   private String tenantCode;
 
+  private String taskType;
+
   /**
    *  task parameters
    **/
@@ -101,6 +104,41 @@ public class TaskProps {
    */
   private Date scheduleTime;
 
+  /**
+   *  command type is complement
+   */
+  private CommandType cmdTypeIfComplement;
+
+
+  public TaskProps(){}
+  public TaskProps(String taskParams,
+                   String taskDir,
+                   Date scheduleTime,
+                   String nodeName,
+                   String taskType,
+                   int taskInstId,
+                   String envFile,
+                   String tenantCode,
+                   String queue,
+                   Date taskStartTime,
+                   Map<String, String> definedParams,
+                   String dependence,
+                   CommandType cmdTypeIfComplement){
+    this.taskParams = taskParams;
+    this.taskDir = taskDir;
+    this.scheduleTime = scheduleTime;
+    this.nodeName = nodeName;
+    this.taskType = taskType;
+    this.taskInstId = taskInstId;
+    this.envFile = envFile;
+    this.tenantCode = tenantCode;
+    this.queue = queue;
+    this.taskStartTime = taskStartTime;
+    this.definedParams = definedParams;
+    this.dependence = dependence;
+    this.cmdTypeIfComplement = cmdTypeIfComplement;
+
+  }
 
   public String getTenantCode() {
     return tenantCode;
@@ -200,22 +238,12 @@ public class TaskProps {
     this.taskTimeoutStrategy = taskTimeoutStrategy;
   }
 
-  /**
-   *  get parameters map
-   * @return
-   */
-  public Map<String,Property> getUserDefParamsMap() {
-    if (definedParams != null) {
-      Map<String,Property> userDefParamsMaps = new HashMap<>();
-      Iterator<Map.Entry<String, String>> iter = definedParams.entrySet().iterator();
-      while (iter.hasNext()){
-        Map.Entry<String, String> en = iter.next();
-        Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR , en.getValue());
-        userDefParamsMaps.put(property.getProp(),property);
-      }
-      return userDefParamsMaps;
-    }
-    return null;
+  public String getTaskType() {
+    return taskType;
+  }
+
+  public void setTaskType(String taskType) {
+    this.taskType = taskType;
   }
 
   public String getDependence() {
@@ -233,4 +261,30 @@ public class TaskProps {
   public void setScheduleTime(Date scheduleTime) {
     this.scheduleTime = scheduleTime;
   }
+
+  public CommandType getCmdTypeIfComplement() {
+    return cmdTypeIfComplement;
+  }
+
+  public void setCmdTypeIfComplement(CommandType cmdTypeIfComplement) {
+    this.cmdTypeIfComplement = cmdTypeIfComplement;
+  }
+
+  /**
+   *  get parameters map
+   * @return
+   */
+  public Map<String,Property> getUserDefParamsMap() {
+    if (definedParams != null) {
+      Map<String,Property> userDefParamsMaps = new HashMap<>();
+      Iterator<Map.Entry<String, String>> iter = definedParams.entrySet().iterator();
+      while (iter.hasNext()){
+        Map.Entry<String, String> en = iter.next();
+        Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR , en.getValue());
+        userDefParamsMaps.put(property.getProp(),property);
+      }
+      return userDefParamsMaps;
+    }
+    return null;
+  }
 }

+ 0 - 2
escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java

@@ -208,6 +208,4 @@ public class DependentExecute {
         return dependResultMap;
     }
 
-
-
 }

+ 1 - 1
escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java

@@ -104,7 +104,7 @@ public class DependentTask extends AbstractTask {
                         Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
             }
         }catch (Exception e){
-            logger.error("Exception " + e);
+            logger.error(e.getMessage(),e);
             exitStatusCode = -1;
         }
     }

+ 4 - 3
escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java

@@ -70,8 +70,8 @@ public class MapReduceTask extends AbstractYarnTask {
         Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
                 taskProps.getDefinedParams(),
                 mapreduceParameters.getLocalParametersMap(),
-                processInstance.getCmdTypeIfComplement(),
-                processInstance.getScheduleTime());
+                taskProps.getCmdTypeIfComplement(),
+                taskProps.getScheduleTime());
         if (paramsMap != null){
             String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(),  ParamUtils.convert(paramsMap));
             mapreduceParameters.setMainArgs(args);
@@ -86,7 +86,8 @@ public class MapReduceTask extends AbstractYarnTask {
     protected String buildCommand() throws Exception {
         List<String> parameterList = buildParameters(mapreduceParameters);
 
-        String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList), taskProps.getDefinedParams());
+        String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList),
+                taskProps.getDefinedParams());
         logger.info("mapreduce task command: {}", command);
 
         return command;

+ 172 - 165
escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java

@@ -21,12 +21,7 @@ import cn.escheduler.common.enums.DataType;
 import cn.escheduler.common.enums.DbType;
 import cn.escheduler.common.enums.Direct;
 import cn.escheduler.common.enums.TaskTimeoutStrategy;
-import cn.escheduler.common.job.db.BaseDataSource;
-import cn.escheduler.common.job.db.ClickHouseDataSource;
-import cn.escheduler.common.job.db.MySQLDataSource;
-import cn.escheduler.common.job.db.OracleDataSource;
-import cn.escheduler.common.job.db.PostgreDataSource;
-import cn.escheduler.common.job.db.SQLServerDataSource;
+import cn.escheduler.common.job.db.*;
 import cn.escheduler.common.process.Property;
 import cn.escheduler.common.task.AbstractParameters;
 import cn.escheduler.common.task.procedure.ProcedureParameters;
@@ -49,6 +44,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import static cn.escheduler.common.enums.DataType.*;
+
 /**
  *  procedure task
  */
@@ -64,6 +61,11 @@ public class ProcedureTask extends AbstractTask {
      */
     private ProcessDao processDao;
 
+    /**
+     * base datasource
+     */
+    private BaseDataSource baseDataSource;
+
     public ProcedureTask(TaskProps taskProps, Logger logger) {
         super(taskProps, logger);
 
@@ -93,176 +95,181 @@ public class ProcedureTask extends AbstractTask {
 
         // determine whether there is a data source
         if (procedureParameters.getDatasource() == 0){
-            logger.error("datasource is null");
-            exitStatusCode = 0;
-        }else {
+            logger.error("datasource id not exists");
+            exitStatusCode = -1;
+            return;
+        }
 
-            DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
-            logger.info("datasource name : {} , type : {} , desc : {} ,  user_id : {} , parameter : {}",
-                    dataSource.getName(),dataSource.getType(),dataSource.getNote(),
-                    dataSource.getUserId(),dataSource.getConnectionParams());
+        DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
+        logger.info("datasource name : {} , type : {} , desc : {} ,  user_id : {} , parameter : {}",
+                dataSource.getName(),
+                dataSource.getType(),
+                dataSource.getNote(),
+                dataSource.getUserId(),
+                dataSource.getConnectionParams());
+
+        if (dataSource == null){
+            logger.error("datasource not exists");
+            exitStatusCode = -1;
+            return;
+        }
+        Connection connection = null;
+        CallableStatement stmt = null;
+        try {
+            // load class
+            DataSourceFactory.loadClass(dataSource.getType());
+            // get datasource
+            baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
+                    dataSource.getConnectionParams());
 
-            if (dataSource != null){
-                Connection connection = null;
-                CallableStatement stmt = null;
-                try {
-                    BaseDataSource baseDataSource = null;
-
-                    if (DbType.MYSQL.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),MySQLDataSource.class);
-                        Class.forName(Constants.JDBC_MYSQL_CLASS_NAME);
-                    }else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class);
-                        Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME);
-                    }else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){
-                        // NOTE: currently, ClickHouse don't support procedure or UDF yet,
-                        //  but still load JDBC driver to keep source code sync with other DB
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class);
-                        Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME);
-                    }else if (DbType.ORACLE.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(), OracleDataSource.class);
-                        Class.forName(Constants.JDBC_ORACLE_CLASS_NAME);
-                    }else if (DbType.SQLSERVER.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(), SQLServerDataSource.class);
-                        Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME);
-                    }
+            // get jdbc connection
+            connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
+                    baseDataSource.getUser(),
+                    baseDataSource.getPassword());
 
-                    // get jdbc connection
-                    connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
-                            baseDataSource.getUser(),
-                            baseDataSource.getPassword());
 
-                    // get process instance by task instance id
-                    ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
 
-                    // combining local and global parameters
-                    Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
-                            taskProps.getDefinedParams(),
-                            procedureParameters.getLocalParametersMap(),
-                            processInstance.getCmdTypeIfComplement(),
-                            processInstance.getScheduleTime());
+            // combining local and global parameters
+            Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
+                    taskProps.getDefinedParams(),
+                    procedureParameters.getLocalParametersMap(),
+                    taskProps.getCmdTypeIfComplement(),
+                    taskProps.getScheduleTime());
 
 
-                    Collection<Property> userDefParamsList = null;
+            Collection<Property> userDefParamsList = null;
 
-                    if (procedureParameters.getLocalParametersMap() != null){
-                        userDefParamsList = procedureParameters.getLocalParametersMap().values();
-                    }
+            if (procedureParameters.getLocalParametersMap() != null){
+                userDefParamsList = procedureParameters.getLocalParametersMap().values();
+            }
 
-                    String method = "";
-                    // no parameters
-                    if (CollectionUtils.isEmpty(userDefParamsList)){
-                        method = "{call " + procedureParameters.getMethod() + "}";
-                    }else { // exists parameters
-                        int size = userDefParamsList.size();
-                        StringBuilder parameter = new StringBuilder();
-                        parameter.append("(");
-                        for (int i = 0 ;i < size - 1; i++){
-                            parameter.append("?,");
-                        }
-                        parameter.append("?)");
-                        method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}";
-                    }
+            String method = "";
+            // no parameters
+            if (CollectionUtils.isEmpty(userDefParamsList)){
+                method = "{call " + procedureParameters.getMethod() + "}";
+            }else { // exists parameters
+                int size = userDefParamsList.size();
+                StringBuilder parameter = new StringBuilder();
+                parameter.append("(");
+                for (int i = 0 ;i < size - 1; i++){
+                    parameter.append("?,");
+                }
+                parameter.append("?)");
+                method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}";
+            }
 
-                    logger.info("call method : {}",method);
-                    // call method
-                    stmt = connection.prepareCall(method);
-                    if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){
-                        stmt.setQueryTimeout(taskProps.getTaskTimeout());
-                    }
-                    Map<Integer,Property> outParameterMap = new HashMap<>();
-                    if (userDefParamsList != null && userDefParamsList.size() > 0){
-                        int index = 1;
-                        for (Property property : userDefParamsList){
-                            logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
-                                    ,property.getProp(),
-                                    property.getDirect(),
-                                    property.getType(),
-                                    property.getValue());
-                            // set parameters
-                            if (property.getDirect().equals(Direct.IN)){
-                                ParameterUtils.setInParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
-                            }else if (property.getDirect().equals(Direct.OUT)){
-                                setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
-                                property.setValue(paramsMap.get(property.getProp()).getValue());
-                                outParameterMap.put(index,property);
-                            }
-                            index++;
-                        }
+            logger.info("call method : {}",method);
+            // call method
+            stmt = connection.prepareCall(method);
+            if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){
+                stmt.setQueryTimeout(taskProps.getTaskTimeout());
+            }
+            Map<Integer,Property> outParameterMap = new HashMap<>();
+            if (userDefParamsList != null && userDefParamsList.size() > 0){
+                int index = 1;
+                for (Property property : userDefParamsList){
+                    logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
+                            ,property.getProp(),
+                            property.getDirect(),
+                            property.getType(),
+                            property.getValue());
+                    // set parameters
+                    if (property.getDirect().equals(Direct.IN)){
+                        ParameterUtils.setInParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
+                    }else if (property.getDirect().equals(Direct.OUT)){
+                        setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
+                        property.setValue(paramsMap.get(property.getProp()).getValue());
+                        outParameterMap.put(index,property);
                     }
+                    index++;
+                }
+            }
 
-                    stmt.executeUpdate();
-
-                    /**
-                     *  print the output parameters to the log
-                     */
-                    Iterator<Map.Entry<Integer, Property>> iter = outParameterMap.entrySet().iterator();
-                    while (iter.hasNext()){
-                        Map.Entry<Integer, Property> en = iter.next();
-
-                        int index = en.getKey();
-                        Property property = en.getValue();
-                        String prop = property.getProp();
-                        DataType dataType = property.getType();
-
-                        if (dataType.equals(DataType.VARCHAR)){
-                            String value = stmt.getString(index);
-                            logger.info("out prameter key : {} , value : {}",prop,value);
-                        }else if (dataType.equals(DataType.INTEGER)){
-                            int value = stmt.getInt(index);
-                            logger.info("out prameter key : {} , value : {}",prop,value);
-                        }else if (dataType.equals(DataType.LONG)){
-                            long value = stmt.getLong(index);
-                            logger.info("out prameter key : {} , value : {}",prop,value);
-                        }else if (dataType.equals(DataType.FLOAT)){
-                            float value = stmt.getFloat(index);
-                            logger.info("out prameter key : {} , value : {}",prop,value);
-                        }else if (dataType.equals(DataType.DOUBLE)){
-                            double value = stmt.getDouble(index);
-                            logger.info("out prameter key : {} , value : {}",prop,value);
-                        }else if (dataType.equals(DataType.DATE)){
-                            Date value = stmt.getDate(index);
-                            logger.info("out prameter key : {} , value : {}",prop,value);
-                        }else if (dataType.equals(DataType.TIME)){
-                            Time value = stmt.getTime(index);
-                            logger.info("out prameter key : {} , value : {}",prop,value);
-                        }else if (dataType.equals(DataType.TIMESTAMP)){
-                            Timestamp value = stmt.getTimestamp(index);
-                            logger.info("out prameter key : {} , value : {}",prop,value);
-                        }else if (dataType.equals(DataType.BOOLEAN)){
-                            boolean value = stmt.getBoolean(index);
-                            logger.info("out prameter key : {} , value : {}",prop,value);
-                        }
-                    }
+            stmt.executeUpdate();
+
+            /**
+             *  print the output parameters to the log
+             */
+            Iterator<Map.Entry<Integer, Property>> iter = outParameterMap.entrySet().iterator();
+            while (iter.hasNext()){
+                Map.Entry<Integer, Property> en = iter.next();
+
+                int index = en.getKey();
+                Property property = en.getValue();
+                String prop = property.getProp();
+                DataType dataType = property.getType();
+                // get output parameter
+                getOutputParameter(stmt, index, prop, dataType);
+            }
 
-                    exitStatusCode = 0;
-                }catch (Exception e){
-                    logger.error(e.getMessage(),e);
+            exitStatusCode = 0;
+        }catch (Exception e){
+            logger.error(e.getMessage(),e);
+            exitStatusCode = -1;
+            throw new RuntimeException(String.format("process interrupted. exit status code is %d",exitStatusCode));
+        }
+        finally {
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (SQLException e) {
                     exitStatusCode = -1;
-                    throw new RuntimeException("process interrupted. exit status code is : "  + exitStatusCode);
+                    logger.error(e.getMessage(),e);
                 }
-                finally {
-                    if (stmt != null) {
-                        try {
-                            stmt.close();
-                        } catch (SQLException e) {
-                            exitStatusCode = -1;
-                            logger.error(e.getMessage(),e);
-                        }
-                    }
-                    if (connection != null) {
-                        try {
-                            connection.close();
-                        } catch (SQLException e) {
-                            exitStatusCode = -1;
-                            logger.error(e.getMessage(), e);
-                        }
-                    }
+            }
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    exitStatusCode = -1;
+                    logger.error(e.getMessage(), e);
                 }
             }
         }
     }
 
+    /**
+     * get output parameter
+     * @param stmt
+     * @param index
+     * @param prop
+     * @param dataType
+     * @throws SQLException
+     */
+    private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
+        switch (dataType){
+            case VARCHAR:
+                logger.info("out prameter key : {} , value : {}",prop,stmt.getString(index));
+                break;
+            case INTEGER:
+                logger.info("out prameter key : {} , value : {}", prop, stmt.getInt(index));
+                break;
+            case LONG:
+                logger.info("out prameter key : {} , value : {}",prop,stmt.getLong(index));
+                break;
+            case FLOAT:
+                logger.info("out prameter key : {} , value : {}",prop,stmt.getFloat(index));
+                break;
+            case DOUBLE:
+                logger.info("out prameter key : {} , value : {}",prop,stmt.getDouble(index));
+                break;
+            case DATE:
+                logger.info("out prameter key : {} , value : {}",prop,stmt.getDate(index));
+                break;
+            case TIME:
+                logger.info("out prameter key : {} , value : {}",prop,stmt.getTime(index));
+                break;
+            case TIMESTAMP:
+                logger.info("out prameter key : {} , value : {}",prop,stmt.getTimestamp(index));
+                break;
+            case BOOLEAN:
+                logger.info("out prameter key : {} , value : {}",prop, stmt.getBoolean(index));
+                break;
+            default:
+                break;
+        }
+    }
+
     @Override
     public AbstractParameters getParameters() {
         return procedureParameters;
@@ -277,61 +284,61 @@ public class ProcedureTask extends AbstractTask {
      * @throws Exception
      */
     private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception{
-        if (dataType.equals(DataType.VARCHAR)){
+        if (dataType.equals(VARCHAR)){
             if (StringUtils.isEmpty(value)){
                 stmt.registerOutParameter(index, Types.VARCHAR);
             }else {
                 stmt.registerOutParameter(index, Types.VARCHAR, value);
             }
 
-        }else if (dataType.equals(DataType.INTEGER)){
+        }else if (dataType.equals(INTEGER)){
             if (StringUtils.isEmpty(value)){
                 stmt.registerOutParameter(index, Types.INTEGER);
             }else {
                 stmt.registerOutParameter(index, Types.INTEGER, value);
             }
 
-        }else if (dataType.equals(DataType.LONG)){
+        }else if (dataType.equals(LONG)){
             if (StringUtils.isEmpty(value)){
                 stmt.registerOutParameter(index,Types.INTEGER);
             }else {
                 stmt.registerOutParameter(index,Types.INTEGER ,value);
             }
-        }else if (dataType.equals(DataType.FLOAT)){
+        }else if (dataType.equals(FLOAT)){
             if (StringUtils.isEmpty(value)){
                 stmt.registerOutParameter(index, Types.FLOAT);
             }else {
                 stmt.registerOutParameter(index, Types.FLOAT,value);
             }
-        }else if (dataType.equals(DataType.DOUBLE)){
+        }else if (dataType.equals(DOUBLE)){
             if (StringUtils.isEmpty(value)){
                 stmt.registerOutParameter(index, Types.DOUBLE);
             }else {
                 stmt.registerOutParameter(index, Types.DOUBLE , value);
             }
 
-        }else if (dataType.equals(DataType.DATE)){
+        }else if (dataType.equals(DATE)){
             if (StringUtils.isEmpty(value)){
                 stmt.registerOutParameter(index, Types.DATE);
             }else {
                 stmt.registerOutParameter(index, Types.DATE , value);
             }
 
-        }else if (dataType.equals(DataType.TIME)){
+        }else if (dataType.equals(TIME)){
             if (StringUtils.isEmpty(value)){
                 stmt.registerOutParameter(index, Types.TIME);
             }else {
                 stmt.registerOutParameter(index, Types.TIME , value);
             }
 
-        }else if (dataType.equals(DataType.TIMESTAMP)){
+        }else if (dataType.equals(TIMESTAMP)){
             if (StringUtils.isEmpty(value)){
                 stmt.registerOutParameter(index, Types.TIMESTAMP);
             }else {
                 stmt.registerOutParameter(index, Types.TIMESTAMP , value);
             }
 
-        }else if (dataType.equals(DataType.BOOLEAN)){
+        }else if (dataType.equals(BOOLEAN)){
             if (StringUtils.isEmpty(value)){
                 stmt.registerOutParameter(index, Types.BOOLEAN);
             }else {

+ 23 - 49
escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java

@@ -20,27 +20,18 @@ package cn.escheduler.server.worker.task.python;
 import cn.escheduler.common.process.Property;
 import cn.escheduler.common.task.AbstractParameters;
 import cn.escheduler.common.task.python.PythonParameters;
-import cn.escheduler.common.utils.CommonUtils;
 import cn.escheduler.common.utils.JSONUtils;
 import cn.escheduler.common.utils.ParameterUtils;
 import cn.escheduler.dao.DaoFactory;
 import cn.escheduler.dao.ProcessDao;
-import cn.escheduler.dao.model.ProcessInstance;
 import cn.escheduler.server.utils.ParamUtils;
 import cn.escheduler.server.worker.task.AbstractTask;
 import cn.escheduler.server.worker.task.PythonCommandExecutor;
 import cn.escheduler.server.worker.task.TaskProps;
 import org.slf4j.Logger;
 
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
+
 import java.util.Map;
-import java.util.Set;
 
 /**
  *  python task
@@ -57,7 +48,10 @@ public class PythonTask extends AbstractTask {
    */
   private String taskDir;
 
-  private PythonCommandExecutor pythonProcessTask;
+  /**
+   * python command executor
+   */
+  private PythonCommandExecutor pythonCommandExecutor;
 
   /**
    * process database access
@@ -70,10 +64,15 @@ public class PythonTask extends AbstractTask {
 
     this.taskDir = taskProps.getTaskDir();
 
-    this.pythonProcessTask = new PythonCommandExecutor(this::logHandle,
-            taskProps.getTaskDir(), taskProps.getTaskAppId(),
-            taskProps.getTenantCode(), null, taskProps.getTaskStartTime(),
-            taskProps.getTaskTimeout(), logger);
+    this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
+            taskProps.getTaskDir(),
+            taskProps.getTaskAppId(),
+            taskProps.getTaskInstId(),
+            taskProps.getTenantCode(),
+            taskProps.getEnvFile(),
+            taskProps.getTaskStartTime(),
+            taskProps.getTaskTimeout(),
+            logger);
     this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
   }
 
@@ -92,9 +91,9 @@ public class PythonTask extends AbstractTask {
   public void handle() throws Exception {
     try {
       //  construct process
-      exitStatusCode = pythonProcessTask.run(buildCommand(), processDao);
+      exitStatusCode = pythonCommandExecutor.run(buildCommand(), processDao);
     } catch (Exception e) {
-      logger.error("python process exception", e);
+      logger.error("python task failure", e);
       exitStatusCode = -1;
     }
   }
@@ -102,7 +101,7 @@ public class PythonTask extends AbstractTask {
   @Override
   public void cancelApplication(boolean cancelApplication) throws Exception {
     // cancel process
-    pythonProcessTask.cancelApplication();
+    pythonCommandExecutor.cancelApplication();
   }
 
   /**
@@ -111,21 +110,7 @@ public class PythonTask extends AbstractTask {
    * @throws Exception
    */
   private String buildCommand() throws Exception {
-    // generate scripts
-//    String fileName = String.format("%s/py_%s_node.py", taskDir, taskProps.getTaskAppId());
-//    Path path = new File(fileName).toPath();
-
-
-
-//    if (Files.exists(path)) {
-//      return fileName;
-//    }
-
-    String rawScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
-
-
-    // find process instance by task id
-    ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+    String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
 
     /**
      *  combining local and global parameters
@@ -133,27 +118,16 @@ public class PythonTask extends AbstractTask {
     Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
             taskProps.getDefinedParams(),
             pythonParameters.getLocalParametersMap(),
-            processInstance.getCmdTypeIfComplement(),
-            processInstance.getScheduleTime());
+            taskProps.getCmdTypeIfComplement(),
+            taskProps.getScheduleTime());
     if (paramsMap != null){
-      rawScript = ParameterUtils.convertParameterPlaceholders(rawScript, ParamUtils.convert(paramsMap));
+      rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
     }
 
-
-//    pythonParameters.setRawScript(rawScript);
-
-    logger.info("raw script : {}", pythonParameters.getRawScript());
+    logger.info("raw python script : {}", pythonParameters.getRawScript());
     logger.info("task dir : {}", taskDir);
 
-//    Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-xr-x");
-//    FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
-//
-//    Files.createFile(path, attr);
-//
-//    Files.write(path, pythonParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
-//
-//    return fileName;
-    return rawScript;
+    return rawPythonScript;
   }
 
   @Override

+ 17 - 15
escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java

@@ -54,7 +54,7 @@ public class ShellTask extends AbstractTask {
    */
   private String taskDir;
 
-  private ShellCommandExecutor processTask;
+  private ShellCommandExecutor shellCommandExecutor;
 
   /**
    * process database access
@@ -62,15 +62,19 @@ public class ShellTask extends AbstractTask {
   private ProcessDao processDao;
 
 
-  public ShellTask(TaskProps props, Logger logger) {
-    super(props, logger);
+  public ShellTask(TaskProps taskProps, Logger logger) {
+    super(taskProps, logger);
 
-    this.taskDir = props.getTaskDir();
+    this.taskDir = taskProps.getTaskDir();
 
-    this.processTask = new ShellCommandExecutor(this::logHandle,
-        props.getTaskDir(), props.getTaskAppId(),
-        props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(),
-        props.getTaskTimeout(), logger);
+    this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(),
+            taskProps.getTaskAppId(),
+            taskProps.getTaskInstId(),
+            taskProps.getTenantCode(),
+            taskProps.getEnvFile(),
+            taskProps.getTaskStartTime(),
+            taskProps.getTaskTimeout(),
+            logger);
     this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
   }
 
@@ -89,9 +93,9 @@ public class ShellTask extends AbstractTask {
   public void handle() throws Exception {
     try {
       // construct process
-      exitStatusCode = processTask.run(buildCommand(), processDao);
+      exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);
     } catch (Exception e) {
-      logger.error(e.getMessage(), e);
+      logger.error("shell task failure", e);
       exitStatusCode = -1;
     }
   }
@@ -99,7 +103,7 @@ public class ShellTask extends AbstractTask {
   @Override
   public void cancelApplication(boolean cancelApplication) throws Exception {
     // cancel process
-    processTask.cancelApplication();
+    shellCommandExecutor.cancelApplication();
   }
 
   /**
@@ -118,8 +122,6 @@ public class ShellTask extends AbstractTask {
 
     String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n");
 
-    // find process instance by task id
-    ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
 
     /**
      *  combining local and global parameters
@@ -127,8 +129,8 @@ public class ShellTask extends AbstractTask {
     Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
             taskProps.getDefinedParams(),
             shellParameters.getLocalParametersMap(),
-            processInstance.getCmdTypeIfComplement(),
-            processInstance.getScheduleTime());
+            taskProps.getCmdTypeIfComplement(),
+            taskProps.getScheduleTime());
     if (paramsMap != null){
       script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
     }

+ 2 - 4
escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java

@@ -66,8 +66,6 @@ public class SparkTask extends AbstractYarnTask {
 
     if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
       String args = sparkParameters.getMainArgs();
-      // get process instance by task instance id
-      ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
 
       /**
        *  combining local and global parameters
@@ -75,8 +73,8 @@ public class SparkTask extends AbstractYarnTask {
       Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
               taskProps.getDefinedParams(),
               sparkParameters.getLocalParametersMap(),
-              processInstance.getCmdTypeIfComplement(),
-              processInstance.getScheduleTime());
+              taskProps.getCmdTypeIfComplement(),
+              taskProps.getScheduleTime());
       if (paramsMap != null ){
         args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
       }

+ 135 - 124
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

@@ -18,7 +18,6 @@ package cn.escheduler.server.worker.task.sql;
 
 import cn.escheduler.alert.utils.MailUtils;
 import cn.escheduler.common.Constants;
-import cn.escheduler.common.enums.DbType;
 import cn.escheduler.common.enums.ShowType;
 import cn.escheduler.common.enums.TaskTimeoutStrategy;
 import cn.escheduler.common.enums.UdfType;
@@ -44,8 +43,6 @@ import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.EnumUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 
 import java.sql.*;
@@ -54,7 +51,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static cn.escheduler.common.utils.PropertyUtils.getString;
+import static cn.escheduler.common.Constants.*;
+import static cn.escheduler.common.enums.DbType.*;
 
 /**
  *  sql task
@@ -76,12 +74,22 @@ public class SqlTask extends AbstractTask {
      */
     private AlertDao alertDao;
 
+    /**
+     * datasource
+     */
+    private DataSource dataSource;
+
+    /**
+     * base datasource
+     */
+    private BaseDataSource baseDataSource;
+
 
-    public SqlTask(TaskProps props, Logger logger) {
-        super(props, logger);
+    public SqlTask(TaskProps taskProps, Logger logger) {
+        super(taskProps, logger);
 
         logger.info("sql task params {}", taskProps.getTaskParams());
-        this.sqlParameters = JSONObject.parseObject(props.getTaskParams(), SqlParameters.class);
+        this.sqlParameters = JSONObject.parseObject(taskProps.getTaskParams(), SqlParameters.class);
 
         if (!sqlParameters.checkParameters()) {
             throw new RuntimeException("sql task params is not valid");
@@ -97,75 +105,73 @@ public class SqlTask extends AbstractTask {
         Thread.currentThread().setName(threadLoggerInfoName);
         logger.info(sqlParameters.toString());
         logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}",
-                sqlParameters.getType(), sqlParameters.getDatasource(), sqlParameters.getSql(),
-                sqlParameters.getLocalParams(), sqlParameters.getUdfs(), sqlParameters.getShowType(), sqlParameters.getConnParams());
-
-        // determine whether there is a data source
+                sqlParameters.getType(),
+                sqlParameters.getDatasource(),
+                sqlParameters.getSql(),
+                sqlParameters.getLocalParams(),
+                sqlParameters.getUdfs(),
+                sqlParameters.getShowType(),
+                sqlParameters.getConnParams());
+
+        // not set data source
         if (sqlParameters.getDatasource() == 0){
-            logger.error("datasource is null");
+            logger.error("datasource id not exists");
             exitStatusCode = -1;
-        }else {
-            List<String> createFuncs = null;
-            DataSource dataSource = processDao.findDataSourceById(sqlParameters.getDatasource());
-            logger.info("datasource name : {} , type : {} , desc : {}  , user_id : {} , parameter : {}",
-                    dataSource.getName(),dataSource.getType(),dataSource.getNote(),
-                    dataSource.getUserId(),dataSource.getConnectionParams());
-
-            if (dataSource != null){
-                Connection con = null;
-                try {
-                    BaseDataSource baseDataSource = null;
-                    if (DbType.MYSQL.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),MySQLDataSource.class);
-                        Class.forName(Constants.JDBC_MYSQL_CLASS_NAME);
-                    }else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class);
-                        Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME);
-                    }else if (DbType.HIVE.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),HiveDataSource.class);
-                        Class.forName(Constants.JDBC_HIVE_CLASS_NAME);
-                    }else if (DbType.SPARK.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SparkDataSource.class);
-                        Class.forName(Constants.JDBC_SPARK_CLASS_NAME);
-                    }else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class);
-                        Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME);
-                    }else if (DbType.ORACLE.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),OracleDataSource.class);
-                        Class.forName(Constants.JDBC_ORACLE_CLASS_NAME);
-                    }else if (DbType.SQLSERVER.name().equals(dataSource.getType().name())){
-                        baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SQLServerDataSource.class);
-                        Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME);
-                    }
+            return;
+        }
 
+        dataSource= processDao.findDataSourceById(sqlParameters.getDatasource());
+        logger.info("datasource name : {} , type : {} , desc : {}  , user_id : {} , parameter : {}",
+                dataSource.getName(),
+                dataSource.getType(),
+                dataSource.getNote(),
+                dataSource.getUserId(),
+                dataSource.getConnectionParams());
 
-                    // ready to execute SQL and parameter entity Map
-                    SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
-                    List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()).orElse(new ArrayList<>())
-                            .stream()
-                            .map(this::getSqlAndSqlParamsMap)
-                            .collect(Collectors.toList());
-                    List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements()).orElse(new ArrayList<>())
-                            .stream()
-                            .map(this::getSqlAndSqlParamsMap)
-                            .collect(Collectors.toList());
-
-                    if(EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) && StringUtils.isNotEmpty(sqlParameters.getUdfs())){
-                        List<UdfFunc> udfFuncList = processDao.queryUdfFunListByids(sqlParameters.getUdfs());
-                        createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
-                    }
+        if (dataSource == null){
+            logger.error("datasource not exists");
+            exitStatusCode = -1;
+            return;
+        }
 
-                    // execute sql task
-                    con = executeFuncAndSql(baseDataSource, mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
+        Connection con = null;
+        List<String> createFuncs = null;
+        try {
+            // load class
+            DataSourceFactory.loadClass(dataSource.getType());
+            // get datasource
+            baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
+                    dataSource.getConnectionParams());
+
+            // ready to execute SQL and parameter entity Map
+            SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
+            List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
+                    .orElse(new ArrayList<>())
+                    .stream()
+                    .map(this::getSqlAndSqlParamsMap)
+                    .collect(Collectors.toList());
+            List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements())
+                    .orElse(new ArrayList<>())
+                    .stream()
+                    .map(this::getSqlAndSqlParamsMap)
+                    .collect(Collectors.toList());
+
+            // determine if it is UDF
+            boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
+                    && StringUtils.isNotEmpty(sqlParameters.getUdfs());
+            if(udfTypeFlag){
+                List<UdfFunc> udfFuncList = processDao.queryUdfFunListByids(sqlParameters.getUdfs());
+                createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
+            }
 
-                } finally {
-                    if (con != null) {
-                        try {
-                            con.close();
-                        } catch (SQLException e) {
-                            throw e;
-                        }
-                    }
+            // execute sql task
+            con = executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
+        } finally {
+            if (con != null) {
+                try {
+                    con.close();
+                } catch (SQLException e) {
+                    throw e;
                 }
             }
         }
@@ -180,13 +186,13 @@ public class SqlTask extends AbstractTask {
         StringBuilder sqlBuilder = new StringBuilder();
 
         // find process instance by task id
-        ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+
 
         Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
                 taskProps.getDefinedParams(),
                 sqlParameters.getLocalParametersMap(),
-                processInstance.getCmdTypeIfComplement(),
-                processInstance.getScheduleTime());
+                taskProps.getCmdTypeIfComplement(),
+                taskProps.getScheduleTime());
 
         // spell SQL according to the final user-defined variable
         if(paramsMap == null){
@@ -195,14 +201,15 @@ public class SqlTask extends AbstractTask {
         }
 
         if (StringUtils.isNotEmpty(sqlParameters.getTitle())){
-            String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(), ParamUtils.convert(paramsMap));
-            logger.info(title);
+            String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(),
+                    ParamUtils.convert(paramsMap));
+            logger.info("SQL tile : {}",title);
             sqlParameters.setTitle(title);
         }
 
         // special characters need to be escaped, ${} needs to be escaped
         String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
-        setSqlParamsMap(sql,rgex,sqlParamsMap,paramsMap);
+        setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
 
         // replace the ${} of the SQL statement with the Placeholder
         String formatSql = sql.replaceAll(rgex,"?");
@@ -219,47 +226,45 @@ public class SqlTask extends AbstractTask {
     }
 
     /**
-     *  execute sql
-     * @param baseDataSource
+     * execute sql
      * @param mainSqlBinds
      * @param preStatementsBinds
      * @param postStatementsBinds
      * @param createFuncs
+     * @return
      */
-    public Connection executeFuncAndSql(BaseDataSource baseDataSource,
-                                        SqlBinds mainSqlBinds,
+    public Connection executeFuncAndSql(SqlBinds mainSqlBinds,
                                         List<SqlBinds> preStatementsBinds,
                                         List<SqlBinds> postStatementsBinds,
                                         List<String> createFuncs){
         Connection connection = null;
         try {
-            if (CommonUtils.getKerberosStartupState())  {
-                System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
-                        getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
-                Configuration configuration = new Configuration();
-                configuration.set(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-                UserGroupInformation.setConfiguration(configuration);
-                UserGroupInformation.loginUserFromKeytab(getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME),
-                        getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH));
-            }
-            if (DbType.HIVE.name().equals(sqlParameters.getType())) {
+            // if upload resource is HDFS and kerberos startup
+            CommonUtils.loadKerberosConf();
+
+            // if hive , load connection params if exists
+            if (HIVE == dataSource.getType()) {
                 Properties paramProp = new Properties();
-                paramProp.setProperty("user", baseDataSource.getUser());
-                paramProp.setProperty("password", baseDataSource.getPassword());
-                Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(), Constants.SEMICOLON,"hiveconf:");
+                paramProp.setProperty(USER, baseDataSource.getUser());
+                paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
+                Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(),
+                        SEMICOLON,
+                        HIVE_CONF);
                 if(connParamMap != null){
                     paramProp.putAll(connParamMap);
                 }
 
-                connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),paramProp);
+                connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
+                        paramProp);
             }else{
                 connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
-                        baseDataSource.getUser(), baseDataSource.getPassword());
+                        baseDataSource.getUser(),
+                        baseDataSource.getPassword());
             }
 
             // create temp function
             if (CollectionUtils.isNotEmpty(createFuncs)) {
-                try (Statement  funcStmt = connection.createStatement()) {
+                try (Statement funcStmt = connection.createStatement()) {
                     for (String createFunc : createFuncs) {
                         logger.info("hive create function sql: {}", createFunc);
                         funcStmt.execute(createFunc);
@@ -270,7 +275,7 @@ public class SqlTask extends AbstractTask {
             for (SqlBinds sqlBind: preStatementsBinds) {
                 try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) {
                     int result = stmt.executeUpdate();
-                    logger.info("pre statement execute result: " + result + ", for sql: "  + sqlBind.getSql());
+                    logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
                 }
             }
 
@@ -278,7 +283,7 @@ public class SqlTask extends AbstractTask {
                 // decide whether to executeQuery or executeUpdate based on sqlType
                 if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
                     // query statements need to be convert to JsonArray and inserted into Alert to send
-                    JSONArray array = new JSONArray();
+                    JSONArray resultJSONArray = new JSONArray();
                     ResultSet resultSet = stmt.executeQuery();
                     ResultSetMetaData md = resultSet.getMetaData();
                     int num = md.getColumnCount();
@@ -288,21 +293,19 @@ public class SqlTask extends AbstractTask {
                         for (int i = 1; i <= num; i++) {
                             mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i));
                         }
-                        array.add(mapOfColValues);
+                        resultJSONArray.add(mapOfColValues);
                     }
 
-                    logger.debug("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
-
-                    // send as an attachment
-                    if (StringUtils.isEmpty(sqlParameters.getShowType())) {
-                        logger.info("showType is empty,don't need send email");
-                    } else {
-                        if (array.size() > 0) {
-                            if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
-                                sendAttachment(sqlParameters.getTitle(), JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
-                            }else{
-                                sendAttachment(taskProps.getNodeName() + " query resultsets ", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
-                            }
+                    logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
+
+                    // if there is a result set
+                    if (resultJSONArray.size() > 0) {
+                        if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
+                            sendAttachment(sqlParameters.getTitle(),
+                                    JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
+                        }else{
+                            sendAttachment(taskProps.getNodeName() + " query resultsets ",
+                                    JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
                         }
                     }
 
@@ -310,7 +313,7 @@ public class SqlTask extends AbstractTask {
 
                 } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
                     // non query statement
-                    int result = stmt.executeUpdate();
+                    stmt.executeUpdate();
                     exitStatusCode = 0;
                 }
             }
@@ -318,7 +321,7 @@ public class SqlTask extends AbstractTask {
             for (SqlBinds sqlBind: postStatementsBinds) {
                 try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) {
                     int result = stmt.executeUpdate();
-                    logger.info("post statement execute result: " + result + ", for sql: "  + sqlBind.getSql());
+                    logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
                 }
             }
         } catch (Exception e) {
@@ -328,9 +331,19 @@ public class SqlTask extends AbstractTask {
         return connection;
     }
 
+    /**
+     * preparedStatement bind
+     * @param connection
+     * @param sqlBinds
+     * @return
+     * @throws Exception
+     */
     private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception {
         PreparedStatement  stmt = connection.prepareStatement(sqlBinds.getSql());
-        if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){
+        // is the timeout set
+        boolean timeoutFlag = taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED ||
+                taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
+        if(timeoutFlag){
             stmt.setQueryTimeout(taskProps.getTaskTimeout());
         }
         Map<Integer, Property> params = sqlBinds.getParamsMap();
@@ -340,7 +353,7 @@ public class SqlTask extends AbstractTask {
                 ParameterUtils.setInParameter(key,stmt,prop.getType(),prop.getValue());
             }
         }
-        logger.info("prepare statement replace sql:{}",stmt.toString());
+        logger.info("prepare statement replace sql : {} ",stmt.toString());
         return stmt;
     }
 
@@ -354,9 +367,6 @@ public class SqlTask extends AbstractTask {
         //  process instance
         ProcessInstance instance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
 
-        // process define
-        ProcessDefinition processDefine = processDao.findProcessDefineById(instance.getProcessDefinitionId());
-
         List<User> users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
 
         // receiving group list
@@ -367,7 +377,7 @@ public class SqlTask extends AbstractTask {
         // custom receiver
         String receivers = sqlParameters.getReceivers();
         if (StringUtils.isNotEmpty(receivers)){
-            String[] splits = receivers.split(Constants.COMMA);
+            String[] splits = receivers.split(COMMA);
             for (String receiver : splits){
                 receviersList.add(receiver.trim());
             }
@@ -378,16 +388,17 @@ public class SqlTask extends AbstractTask {
         // Custom Copier
         String receiversCc = sqlParameters.getReceiversCc();
         if (StringUtils.isNotEmpty(receiversCc)){
-            String[] splits = receiversCc.split(Constants.COMMA);
+            String[] splits = receiversCc.split(COMMA);
             for (String receiverCc : splits){
                 receviersCcList.add(receiverCc.trim());
             }
         }
 
-        String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim();
+        String showTypeName = sqlParameters.getShowType().replace(COMMA,"").trim();
         if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
-            Map<String, Object> mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName));
-            if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){
+            Map<String, Object> mailResult = MailUtils.sendMails(receviersList,
+                    receviersCcList, title, content, ShowType.valueOf(showTypeName));
+            if(!(Boolean) mailResult.get(STATUS)){
                 throw new RuntimeException("send mail failed!");
             }
         }else{
@@ -425,7 +436,7 @@ public class SqlTask extends AbstractTask {
     public void printReplacedSql(String content, String formatSql,String rgex, Map<Integer,Property> sqlParamsMap){
         //parameter print style
         logger.info("after replace sql , preparing : {}" , formatSql);
-        StringBuffer logPrint = new StringBuffer("replaced sql , parameters:");
+        StringBuilder logPrint = new StringBuilder("replaced sql , parameters:");
         for(int i=1;i<=sqlParamsMap.size();i++){
             logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")");
         }