|
@@ -21,12 +21,8 @@ import org.apache.commons.collections.CollectionUtils;
|
|
|
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
|
|
|
import org.apache.dolphinscheduler.remote.command.Command;
|
|
|
import org.apache.dolphinscheduler.remote.command.CommandType;
|
|
|
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
|
|
|
-import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
|
|
|
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
|
|
|
-import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
|
|
|
import org.apache.dolphinscheduler.remote.utils.Host;
|
|
|
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
|
|
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
|
|
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
|
|
|
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
|
|
@@ -98,7 +94,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
|
|
|
/**
|
|
|
* build command accord executeContext
|
|
|
*/
|
|
|
- Command command = buildCommand(context);
|
|
|
+ Command command = context.getCommand();
|
|
|
|
|
|
/**
|
|
|
* execute task host
|
|
@@ -111,14 +107,14 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
|
|
|
success = true;
|
|
|
context.setHost(host);
|
|
|
} catch (ExecuteException ex) {
|
|
|
- logger.error(String.format("execute context : %s error", context.getContext()), ex);
|
|
|
+ logger.error(String.format("execute command : %s error", command), ex);
|
|
|
try {
|
|
|
failNodeSet.add(host.getAddress());
|
|
|
Set<String> tmpAllIps = new HashSet<>(allNodes);
|
|
|
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
|
|
|
if (remained != null && remained.size() > 0) {
|
|
|
host = Host.of(remained.iterator().next());
|
|
|
- logger.error("retry execute context : {} host : {}", context.getContext(), host);
|
|
|
+ logger.error("retry execute command : {} host : {}", command, host);
|
|
|
} else {
|
|
|
throw new ExecuteException("fail after try all nodes");
|
|
|
}
|
|
@@ -133,53 +129,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
|
|
|
|
|
|
@Override
|
|
|
public void executeDirectly(ExecutionContext context) throws ExecuteException {
|
|
|
- Command command = buildKillCommand(context);
|
|
|
Host host = context.getHost();
|
|
|
- doExecute(host,command);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * build command
|
|
|
- * @param context context
|
|
|
- * @return command
|
|
|
- */
|
|
|
- private Command buildCommand(ExecutionContext context) {
|
|
|
- TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
|
|
|
- ExecutorType executorType = context.getExecutorType();
|
|
|
- switch (executorType){
|
|
|
- case WORKER:
|
|
|
- TaskExecutionContext taskExecutionContext = context.getContext();
|
|
|
- requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
|
|
|
- break;
|
|
|
- case CLIENT:
|
|
|
- break;
|
|
|
- default:
|
|
|
- throw new IllegalArgumentException("invalid executor type : " + executorType);
|
|
|
-
|
|
|
- }
|
|
|
- return requestCommand.convert2Command();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * build command
|
|
|
- * @param context context
|
|
|
- * @return command
|
|
|
- */
|
|
|
- private Command buildKillCommand(ExecutionContext context) {
|
|
|
- TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
|
|
|
- ExecutorType executorType = context.getExecutorType();
|
|
|
- switch (executorType){
|
|
|
- case WORKER:
|
|
|
- TaskExecutionContext taskExecutionContext = context.getContext();
|
|
|
- requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
|
|
|
- break;
|
|
|
- case CLIENT:
|
|
|
- break;
|
|
|
- default:
|
|
|
- throw new IllegalArgumentException("invalid executor type : " + executorType);
|
|
|
-
|
|
|
- }
|
|
|
- return requestCommand.convert2Command();
|
|
|
+ doExecute(host, context.getCommand());
|
|
|
}
|
|
|
|
|
|
/**
|