baoliang 4 лет назад
Родитель
Сommit
e142b8949d

+ 52 - 28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@@ -17,8 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.executor;
 
-import com.github.rholder.retry.RetryException;
-import org.apache.dolphinscheduler.common.utils.RetryerUtils;
+import org.apache.commons.collections.CollectionUtils;
+
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -31,14 +31,15 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
+
 import java.util.*;
-import java.util.concurrent.ExecutionException;
 
 /**
  *  netty executor manager
@@ -86,11 +87,17 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
      */
     @Override
     public Boolean execute(ExecutionContext context) throws ExecuteException {
-        LinkedList<String> allNodes = new LinkedList<>();
-        Set<String> nodes = getAllNodes(context);
-        if (nodes != null) {
-            allNodes.addAll(nodes);
-        }
+
+        /**
+         *  all nodes
+         */
+        Set<String> allNodes = getAllNodes(context);
+
+        /**
+         * fail nodes
+         */
+        Set<String> failNodeSet = new HashSet<>();
+
         /**
          *  build command accord executeContext
          */
@@ -99,27 +106,31 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
         /**
          * execute task host
          */
-        String startHostAddress = context.getHost().getAddress();
-        // remove start host address and add it to head
-        allNodes.remove(startHostAddress);
-        allNodes.addFirst(startHostAddress);
- 
+        Host host = context.getHost();
         boolean success = false;
-        for (String address : allNodes) {
+        while (!success) {
             try {
-                Host host = Host.of(address);
-                doExecute(host, command);
+                doExecute(host,command);
                 success = true;
                 context.setHost(host);
-                break;
             } catch (ExecuteException ex) {
-                logger.error("retry execute command : {} host : {}", command, address);
+                logger.error(String.format("execute command : %s error", command), ex);
+                try {
+                    failNodeSet.add(host.getAddress());
+                    Set<String> tmpAllIps = new HashSet<>(allNodes);
+                    Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
+                    if (remained != null && remained.size() > 0) {
+                        host = Host.of(remained.iterator().next());
+                        logger.error("retry execute command : {} host : {}", command, host);
+                    } else {
+                        throw new ExecuteException("fail after try all nodes");
+                    }
+                } catch (Throwable t) {
+                    throw new ExecuteException("fail after try all nodes");
+                }
             }
         }
-        if (!success) {
-            throw new ExecuteException("fail after try all nodes");
-        }
-        
+
         return success;
     }
 
@@ -136,13 +147,26 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
      * @throws ExecuteException if error throws ExecuteException
      */
     private void doExecute(final Host host, final Command command) throws ExecuteException {
-        try {
-            RetryerUtils.retryCall(() -> {
+        /**
+         * retry count,default retry 3
+         */
+        int retryCount = 3;
+        boolean success = false;
+        do {
+            try {
                 nettyRemotingClient.send(host, command);
-                return Boolean.TRUE;
-            });
-        } catch (ExecutionException | RetryException e) {
-            throw new ExecuteException(String.format("send command : %s to %s error", command, host), e);
+                success = true;
+            } catch (Exception ex) {
+                logger.error(String.format("send command : %s to %s error", command, host), ex);
+                retryCount--;
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ignore) {}
+            }
+        } while (retryCount >= 0 && !success);
+
+        if (!success) {
+            throw new ExecuteException(String.format("send command : %s to %s error", command, host));
         }
     }
 

+ 4 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@@ -149,6 +149,10 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
                     logger.error("process instance not exists , master task exec thread exit");
                     return true;
                 }
+                if (checkTaskTimeout()) {
+                    this.checkTimeoutFlag = !alertTimeout();
+                    handleTimeoutFailed();
+                }
                 if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
                     cancelTaskInstance();
                     break;

+ 4 - 2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java

@@ -70,14 +70,16 @@ public class UDFUtils {
      */
     private static void buildJarSql(List<String> sqls, Map<UdfFunc,String> udfFuncTenantCodeMap) {
         String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS);
-
+        String resourceFullName;
         Set<Map.Entry<UdfFunc,String>> entries = udfFuncTenantCodeMap.entrySet();
         for (Map.Entry<UdfFunc,String> entry:entries){
             String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue());
             if (!uploadPath.startsWith("hdfs:")) {
                 uploadPath = defaultFS + uploadPath;
             }
-            sqls.add(String.format("add jar %s%s", uploadPath, entry.getKey().getResourceName()));
+            resourceFullName = entry.getKey().getResourceName();
+            resourceFullName = resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s",resourceFullName);
+            sqls.add(String.format("add jar %s%s", uploadPath, resourceFullName));
         }
 
     }

+ 1 - 0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@@ -109,6 +109,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort());
         taskExecutionContext.setStartTime(new Date());
         taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
+        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
 
         // local execute path
         String execLocalPath = getExecLocalPath(taskExecutionContext);

+ 13 - 0
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@@ -87,6 +87,8 @@ export default {
   'Main class': 'Main class',
   'Main jar package': 'Main jar package',
   'Please enter main jar package': 'Please enter main jar package',
+  'Main package': 'Main package',
+  'Please enter main package': 'Please enter main package',
   'Command-line parameters': 'Command-line parameters',
   'Please enter Command-line parameters': 'Please enter Command-line parameters',
   'Other parameters': 'Other parameters',
@@ -183,9 +185,14 @@ export default {
   'Edit Tenant': 'Edit Tenant',
   'Tenant Code': 'Tenant Code',
   Queue: 'Yarn Queue',
+  'Tenant Name': 'Tenant Name',
   'Please select a queue': 'default is tenant association queue',
   'Please enter the tenant code in English': 'Please enter the tenant code in English',
   'Please enter tenant code in English': 'Please enter tenant code in English',
+  'Please enter tenant code': 'Please enter tenant code',
+  'Please enter tenant Name': 'Please enter tenant Name',
+  'The tenant code. Only letters or a combination of letters and numbers are allowed': 'The tenant code. Only letters or a combination of letters and numbers are allowed',
+  'The tenant code cannot be all numbers': 'The tenant code cannot be all numbers',
   'Edit User': 'Edit User',
   Tenant: 'Tenant',
   Email: 'Email',
@@ -348,9 +355,11 @@ export default {
   Delete: 'Delete',
   'Please enter keyword': 'Please enter keyword',
   'File Upload': 'File Upload',
+  'File ReUpload': 'File ReUpload',
   'Drag the file into the current upload window': 'Drag the file into the current upload window',
   'Drag area upload': 'Drag area upload',
   Upload: 'Upload',
+  'ReUpload File': 'Re-upload file',
   'Please enter file name': 'Please enter file name',
   'Please select the file to upload': 'Please select the file to upload',
   'Resources manage': 'Resources',
@@ -484,6 +493,7 @@ export default {
   'Please enter the IP address separated by commas': 'Please enter the IP address separated by commas',
   'Note: Multiple IP addresses have been comma separated': 'Note: Multiple IP addresses have been comma separated',
   'Failure time': 'Failure time',
+  'Expiration time': 'Expiration time',
   User: 'User',
   'Please enter token': 'Please enter token',
   'Generate token': 'Generate token',
@@ -632,6 +642,8 @@ export default {
   'Socket Timeout be a positive integer': 'Socket Timeout be a positive integer',
   'ms':'ms',
   'Disable': 'Disable',
+  'No resources exist': 'No resources exist',
+  'Please delete all non-existing resources': 'Please delete all non-existing resources',
   'The Worker group no longer exists, please select the correct Worker group!': 'The Worker group no longer exists, please select the correct Worker group!',
   'Please confirm whether the workflow has been saved before downloading': 'Please confirm whether the workflow has been saved before downloading',
   'User name length is between 3 and 39': 'User name length is between 3 and 39',
@@ -649,4 +661,5 @@ export default {
   'Batch move': 'Batch move',
   Version: 'Version',
   'Pre tasks': 'Pre tasks',
+  'The workflow canvas is abnormal and cannot be saved, please recreate': 'The workflow canvas is abnormal and cannot be saved, please recreate'
 }

+ 10 - 1
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@@ -185,9 +185,13 @@ export default {
   'Last heartbeat time': '最后心跳时间',
   'Edit Tenant': '编辑租户',
   'Tenant Code': '租户编码',
-  Queue: 'Yarn 队列',
+  'Tenant Name': '租户名称',
+  Queue: '队列',
   'Please enter the tenant code in English': '请输入租户编码只允许英文',
   'Please enter tenant code in English': '请输入英文租户编码',
+  'Please enter tenant code': '请输入租户编码',
+  'Please enter tenant Name': '请输入租户名称',
+  'The tenant code. Only letters or a combination of letters and numbers are allowed': '租户编码只允许字母或字母与数字组合',
   'Edit User': '编辑用户',
   Tenant: '租户',
   Email: '邮件',
@@ -345,6 +349,7 @@ export default {
   'Drag the file into the current upload window': '请将文件拖拽到当前上传窗口内!',
   'Drag area upload': '拖动区域上传',
   Upload: '上传',
+  'ReUpload File': '重新上传文件',
   'Please enter file name': '请输入文件名',
   'Please select the file to upload': '请选择要上传的文件',
   'Resources manage': '资源中心',
@@ -479,6 +484,7 @@ export default {
   'Please enter the IP address separated by commas': '请输入IP地址多个用英文逗号隔开',
   'Note: Multiple IP addresses have been comma separated': '注意:多个IP地址以英文逗号分割',
   'Failure time': '失效时间',
+  'Expiration time': '失效时间',
   User: '用户',
   'Please enter token': '请输入令牌',
   'Generate token': '生成令牌',
@@ -615,6 +621,8 @@ export default {
   'Custom Script': '自定义脚本',
   'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点',
   'Successful branch flow and failed branch flow are required': 'conditions节点成功和失败分支流转必填',
+  'No resources exist': '不存在资源',
+  'Please delete all non-existing resources': '请删除所有不存在资源',
   'Unauthorized or deleted resources': '未授权或已删除资源',
   'Please delete all non-existent resources': '请删除所有未授权或已删除资源',
   'Kinship': '工作流关系',
@@ -649,4 +657,5 @@ export default {
   'Batch move': '批量移动',
   Version: '版本',
   'Pre tasks': '前置任务',
+  'The workflow canvas is abnormal and cannot be saved, please recreate': '该工作流画布异常,无法保存,请重新创建'
 }