Browse Source

Merge branch 'dev' into fixbug-#2439

dailidong 5 years ago
parent
commit
8b00582e75

+ 5 - 9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java

@@ -106,14 +106,12 @@ public class DataAnalysisService extends BaseService{
         List<ExecuteStatusCount> taskInstanceStateCounts =
                 taskInstanceMapper.countTaskInstanceStateByUser(start, end, projectIds);
 
-        if (taskInstanceStateCounts != null && !taskInstanceStateCounts.isEmpty()) {
+        if (taskInstanceStateCounts != null) {
             TaskCountDto taskCountResult = new TaskCountDto(taskInstanceStateCounts);
             result.put(Constants.DATA_LIST, taskCountResult);
             putMsg(result, Status.SUCCESS);
-        } else {
-            putMsg(result, Status.TASK_INSTANCE_STATE_COUNT_ERROR);
         }
-        return  result;
+        return result;
     }
 
     private void putErrorRequestParamsMsg(Map<String, Object> result) {
@@ -153,14 +151,12 @@ public class DataAnalysisService extends BaseService{
                 processInstanceMapper.countInstanceStateByUser(start, end,
                         projectIdArray);
 
-        if (processInstanceStateCounts != null && !processInstanceStateCounts.isEmpty()) {
+        if (processInstanceStateCounts != null) {
             TaskCountDto taskCountResult = new TaskCountDto(processInstanceStateCounts);
             result.put(Constants.DATA_LIST, taskCountResult);
             putMsg(result, Status.SUCCESS);
-        } else {
-            putMsg(result, Status.COUNT_PROCESS_INSTANCE_STATE_ERROR);
         }
-        return  result;
+        return result;
     }
 
 
@@ -234,7 +230,7 @@ public class DataAnalysisService extends BaseService{
         // count error command state
         List<CommandCount> errorCommandStateCounts =
                 errorCommandMapper.countCommandState(
-                         start, end, projectIdArray);
+                        start, end, projectIdArray);
 
         //
         Map<CommandType,Map<String,Integer>> dataMap = new HashMap<>();

+ 12 - 13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@@ -225,20 +225,14 @@ public class ExecutorService extends BaseService{
                 if (processInstance.getState() == ExecutionStatus.READY_STOP) {
                     putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
                 } else {
-                    processInstance.setCommandType(CommandType.STOP);
-                    processInstance.addHistoryCmd(CommandType.STOP);
-                    processService.updateProcessInstance(processInstance);
-                    result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_STOP);
+                    result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
                 }
                 break;
             case PAUSE:
                 if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
                     putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
                 } else {
-                    processInstance.setCommandType(CommandType.PAUSE);
-                    processInstance.addHistoryCmd(CommandType.PAUSE);
-                    processService.updateProcessInstance(processInstance);
-                    result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_PAUSE);
+                    result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
                 }
                 break;
             default:
@@ -308,22 +302,27 @@ public class ExecutorService extends BaseService{
     }
 
     /**
-     * update process instance state
+     *  prepare to update process instance command type and status
      *
-     * @param processInstanceId process instance id
+     * @param processInstance process instance
+     * @param commandType command type
      * @param executionStatus execute status
      * @return update result
      */
-    private Map<String, Object> updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
+    private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
         Map<String, Object> result = new HashMap<>(5);
 
-        int update = processService.updateProcessInstanceState(processInstanceId, executionStatus);
+        processInstance.setCommandType(commandType);
+        processInstance.addHistoryCmd(commandType);
+        processInstance.setState(executionStatus);
+        int update = processService.updateProcessInstance(processInstance);
+
+        // determine whether the process is normal
         if (update > 0) {
             putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
         }
-
         return result;
     }
 

+ 0 - 7
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java

@@ -114,9 +114,6 @@ public class DataAnalysisServiceTest {
         Map<String, Object> result = dataAnalysisService.countTaskStateByProject(user, 2, startDate, endDate);
         Assert.assertTrue(result.isEmpty());
 
-        // task instance state count error
-        result = dataAnalysisService.countTaskStateByProject(user, 1, startDate, endDate);
-        Assert.assertEquals(Status.TASK_INSTANCE_STATE_COUNT_ERROR,result.get(Constants.STATUS));
 
         //SUCCESS
         Mockito.when(taskInstanceMapper.countTaskInstanceStateByUser(DateUtils.getScheduleDate(startDate),
@@ -137,10 +134,6 @@ public class DataAnalysisServiceTest {
         Map<String, Object> result = dataAnalysisService.countProcessInstanceStateByProject(user,2,startDate,endDate);
         Assert.assertTrue(result.isEmpty());
 
-        //COUNT_PROCESS_INSTANCE_STATE_ERROR
-        result = dataAnalysisService.countProcessInstanceStateByProject(user,1,startDate,endDate);
-        Assert.assertEquals(Status.COUNT_PROCESS_INSTANCE_STATE_ERROR,result.get(Constants.STATUS));
-
         //SUCCESS
         Mockito.when(processInstanceMapper.countInstanceStateByUser(DateUtils.getScheduleDate(startDate),
                 DateUtils.getScheduleDate(endDate), new Integer[]{1})).thenReturn(getTaskInstanceStateCounts());

+ 10 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java

@@ -223,4 +223,14 @@ public class ThreadUtils {
         }
         return id + " (" + name + ")";
     }
+
+    /**
+     * sleep
+     * @param millis millis
+     */
+    public static void sleep(final long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (final InterruptedException ignore) {}
+    }
 }

+ 1 - 1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java

@@ -111,7 +111,7 @@ public class Command {
     /**
      * worker group
      */
-    @TableField(exist = false)
+    @TableField("worker_group")
     private String workerGroup;
 
     public Command() {

+ 4 - 9
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@@ -76,7 +76,8 @@ public class CommandMapperTest {
         //query
         Command actualCommand = commandMapper.selectById(expectedCommand.getId());
 
-        assertEquals(expectedCommand, actualCommand);
+        assertNotNull(actualCommand);
+        assertEquals(expectedCommand.getProcessDefinitionId(), actualCommand.getProcessDefinitionId());
     }
 
     /**
@@ -94,7 +95,8 @@ public class CommandMapperTest {
 
         Command actualCommand = commandMapper.selectById(expectedCommand.getId());
 
-        assertEquals(expectedCommand,actualCommand);
+        assertNotNull(actualCommand);
+        assertEquals(expectedCommand.getUpdateTime(),actualCommand.getUpdateTime());
 
     }
 
@@ -127,13 +129,6 @@ public class CommandMapperTest {
         List<Command> actualCommands = commandMapper.selectList(null);
 
         assertThat(actualCommands.size(), greaterThanOrEqualTo(count));
-
-        for (Command actualCommand : actualCommands){
-            Command expectedCommand = commandMap.get(actualCommand.getId());
-            if (expectedCommand != null){
-                assertEquals(expectedCommand,actualCommand);
-            }
-        }
     }
 
     /**

+ 2 - 3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
 import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@@ -122,9 +123,7 @@ public class TaskPriorityQueueConsumer extends Thread{
                 result =  dispatcher.dispatch(executionContext);
             } catch (ExecuteException e) {
                 logger.error("dispatch error",e);
-                try {
-                    Thread.sleep(SLEEP_TIME_MILLIS);
-                } catch (InterruptedException e1) {}
+                ThreadUtils.sleep(SLEEP_TIME_MILLIS);
             }
 
             if (result){

+ 2 - 4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
 import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -101,10 +102,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
             if (taskInstance != null && ackStatus.typeIsRunning()){
                 break;
             }
-
-            try {
-                Thread.sleep(SLEEP_TIME_MILLIS);
-            } catch (InterruptedException e) {}
+            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
         }
 
     }

+ 2 - 4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
 import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -99,10 +100,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
             if (taskInstance != null && responseStatus.typeIsFinished()){
                 break;
             }
-
-            try {
-                Thread.sleep(SLEEP_TIME_MILLIS);
-            } catch (InterruptedException e) {}
+            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
         }
     }
 

+ 1 - 1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@@ -914,7 +914,7 @@ public class MasterExecThread implements Runnable {
                     processInstance.getId(), processInstance.getName(),
                     processInstance.getState(), state,
                     processInstance.getCommandType());
-            processInstance.setState(state);
+
             ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
             instance.setState(state);
             instance.setProcessDefinition(processInstance.getProcessDefinition());

+ 4 - 6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@@ -18,11 +18,11 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 
-import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -37,6 +37,8 @@ import org.springframework.stereotype.Service;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+
 /**
  *  taks callback service
  */
@@ -98,11 +100,7 @@ public class TaskCallbackService {
         while (Stopper.isRunning()) {
             if (CollectionUtils.isEmpty(masterNodes)) {
                 logger.error("no available master node");
-                try {
-                    Thread.sleep(1000);
-                }catch (Exception e){
-
-                }
+                ThreadUtils.sleep(SLEEP_TIME_MILLIS);
             }else {
                 break;
             }

+ 3 - 2
sql/dolphinscheduler-postgre.sql

@@ -234,7 +234,7 @@ CREATE TABLE t_ds_command (
   dependence varchar(255) DEFAULT NULL ,
   update_time timestamp DEFAULT NULL ,
   process_instance_priority int DEFAULT NULL ,
-  worker_group_id int DEFAULT '-1' ,
+  worker_group varchar(64),
   PRIMARY KEY (id)
 ) ;
 
@@ -275,7 +275,7 @@ CREATE TABLE t_ds_error_command (
   update_time timestamp DEFAULT NULL ,
   dependence text ,
   process_instance_priority int DEFAULT NULL ,
-  worker_group_id int DEFAULT '-1' ,
+  worker_group varchar(64),
   message text ,
   PRIMARY KEY (id)
 );
@@ -748,7 +748,7 @@ CREATE SEQUENCE  t_ds_worker_server_id_sequence;
 ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence');
 
 
+-- Records of t_ds_user?user : admin , password : dolphinscheduler123
 INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
 
 -- Records of t_ds_alertgroup,dolphinscheduler warning group

+ 2 - 2
sql/dolphinscheduler_mysql.sql

@@ -333,7 +333,7 @@ CREATE TABLE `t_ds_command` (
   `dependence` varchar(255) DEFAULT NULL COMMENT 'dependence',
   `update_time` datetime DEFAULT NULL COMMENT 'update time',
   `process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
-  `worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id',
+  `worker_group` varchar(64)  COMMENT 'worker group',
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 
@@ -380,7 +380,7 @@ CREATE TABLE `t_ds_error_command` (
   `update_time` datetime DEFAULT NULL COMMENT 'update time',
   `dependence` text COMMENT 'dependence',
   `process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority, 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
-  `worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id',
+  `worker_group` varchar(64)  COMMENT 'worker group',
   `message` text COMMENT 'message',
   PRIMARY KEY (`id`) USING BTREE
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;