Browse Source

fix #1245, make scanCommand transactional (#1246)

Baoqi Wu 5 years ago
parent
commit
09dec3b7a8

+ 21 - 35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java

@@ -109,58 +109,44 @@ public class ProcessDao {
 
 
     /**
-     * find one command from command queue, construct process instance
+     * handle Command (construct ProcessInstance from Command) , wrapped in transaction
      * @param logger logger
      * @param host host
      * @param validThreadNum validThreadNum
+     * @param command found command
      * @return process instance
      */
     @Transactional(rollbackFor = Exception.class)
-    public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){
-
-        ProcessInstance processInstance = null;
-        Command command = findOneCommand();
-        if (command == null) {
+    public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {
+        ProcessInstance processInstance = constructProcessInstance(command, host);
+        //cannot construct process instance, return null;
+        if(processInstance == null){
+            logger.error("scan command, command parameter is error: %s", command.toString());
+            moveToErrorCommand(command, "process instance is null");
             return null;
         }
-        logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
-
-        try{
-            processInstance = constructProcessInstance(command, host);
-            //cannot construct process instance, return null;
-            if(processInstance == null){
-                logger.error("scan command, command parameter is error: %s", command.toString());
-                delCommandByid(command.getId());
-                saveErrorCommand(command, "process instance is null");
-                return null;
-            }
-            if(!checkThreadNum(command, validThreadNum)){
-                logger.info("there is not enough thread for this command: {}",command.toString() );
-                return setWaitingThreadProcess(command, processInstance);
-            }
-            processInstance.setCommandType(command.getCommandType());
-            processInstance.addHistoryCmd(command.getCommandType());
-            saveProcessInstance(processInstance);
-            this.setSubProcessParam(processInstance);
-            delCommandByid(command.getId());
-            return processInstance;
-        }catch (Exception e){
-            logger.error("scan command error ", e);
-            saveErrorCommand(command, e.toString());
-            delCommandByid(command.getId());
+        if(!checkThreadNum(command, validThreadNum)){
+            logger.info("there is not enough thread for this command: {}",command.toString() );
+            return setWaitingThreadProcess(command, processInstance);
         }
-        return null;
+        processInstance.setCommandType(command.getCommandType());
+        processInstance.addHistoryCmd(command.getCommandType());
+        saveProcessInstance(processInstance);
+        this.setSubProcessParam(processInstance);
+        delCommandByid(command.getId());
+        return processInstance;
     }
 
     /**
-     * save error command
+     * save error command, and delete original command
      * @param command command
      * @param message message
      */
-    private void saveErrorCommand(Command command, String message) {
-
+    @Transactional(rollbackFor = Exception.class)
+    public void moveToErrorCommand(Command command, String message) {
         ErrorCommand errorCommand = new ErrorCommand(command, message);
         this.errorCommandMapper.insert(errorCommand);
+        delCommandByid(command.getId());
     }
 
     /**

+ 15 - 4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java

@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
 import org.apache.dolphinscheduler.dao.ProcessDao;
+import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
 import org.apache.commons.configuration.Configuration;
@@ -108,10 +109,20 @@ public class MasterSchedulerThread implements Runnable {
                         ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
                         int activeCount = poolExecutor.getActiveCount();
                         // make sure to scan and delete command  table in one transaction
-                        processInstance = processDao.scanCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount);
-                        if (processInstance != null) {
-                            logger.info("start master exex thread , split DAG ...");
-                            masterExecService.execute(new MasterExecThread(processInstance,processDao));
+                        Command command = processDao.findOneCommand();
+                        if (command != null) {
+                            logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
+
+                            try{
+                                processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
+                                if (processInstance != null) {
+                                    logger.info("start master exec thread , split DAG ...");
+                                    masterExecService.execute(new MasterExecThread(processInstance,processDao));
+                                }
+                            }catch (Exception e){
+                                logger.error("scan command error ", e);
+                                processDao.moveToErrorCommand(command, e.toString());
+                            }
                         }
                     }
                 }