Browse Source

[Improvement][Task]Check the task plugin configuration when the worke… (#6184)

* [Improvement][Task]Check the task plugin configuration when the worker starts
Kirs 3 years ago
parent
commit
7029062f4c

+ 8 - 2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@@ -35,10 +35,13 @@ import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThr
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
 import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
 import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
+import org.apache.commons.collections4.MapUtils;
+
 import java.util.Set;
 
 import javax.annotation.PostConstruct;
@@ -180,12 +183,15 @@ public class WorkerServer implements IStoppable {
             taskPluginManagerConfig.setMavenLocalRepository(workerConfig.getMavenLocalRepository().trim());
         }
 
-        DolphinPluginLoader alertPluginLoader = new DolphinPluginLoader(taskPluginManagerConfig, ImmutableList.of(taskPluginManager));
+        DolphinPluginLoader taskPluginLoader = new DolphinPluginLoader(taskPluginManagerConfig, ImmutableList.of(taskPluginManager));
         try {
-            alertPluginLoader.loadPlugins();
+            taskPluginLoader.loadPlugins();
         } catch (Exception e) {
             throw new RuntimeException("Load Task Plugin Failed !", e);
         }
+        if (MapUtils.isEmpty(taskPluginManager.getTaskChannelMap())) {
+            throw new PluginNotFoundException("Task Plugin Not Found,Please Check Config File");
+        }
     }
 
     public void close(String cause) {

+ 13 - 6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
 import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
 import org.apache.dolphinscheduler.spi.task.AbstractTask;
 import org.apache.dolphinscheduler.spi.task.TaskChannel;
 import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
@@ -61,7 +62,7 @@ import org.slf4j.LoggerFactory;
 import com.github.rholder.retry.RetryException;
 
 /**
- *  task scheduler thread
+ * task scheduler thread
  */
 public class TaskExecuteThread implements Runnable, Delayed {
 
@@ -103,7 +104,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
     private TaskPluginManager taskPluginManager;
 
     /**
-     *  constructor
+     * constructor
+     *
      * @param taskExecutionContext taskExecutionContext
      * @param taskCallbackService taskCallbackService
      */
@@ -128,7 +130,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
     @Override
     public void run() {
 
-        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(),taskExecutionContext.getProcessInstanceId());
+        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
         try {
             logger.info("script path : {}", taskExecutionContext.getExecutePath());
             // check if the OS user exists
@@ -161,6 +163,9 @@ public class TaskExecuteThread implements Runnable, Delayed {
                     taskExecutionContext.getTaskInstanceId()));
 
             TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
+            if (null == taskChannel) {
+                throw new PluginNotFoundException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
+            }
 
             //TODO Temporary operation, To be adjusted
             TaskRequest taskRequest = JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext), TaskRequest.class);
@@ -229,6 +234,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
 
     /**
      * get global paras map
+     *
      * @return map
      */
     private Map<String, String> getGlobalParamsMap() {
@@ -251,7 +257,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
             try {
                 task.cancelApplication(true);
             } catch (Exception e) {
-                logger.error(e.getMessage(),e);
+                logger.error(e.getMessage(), e);
             }
         }
     }
@@ -270,7 +276,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
 
         Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();
 
-        for (Map.Entry<String,String> resource : resEntries) {
+        for (Map.Entry<String, String> resource : resEntries) {
             String fullName = resource.getKey();
             String tenantCode = resource.getValue();
             File resFile = new File(execLocalPath, fullName);
@@ -282,7 +288,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
                     logger.info("get resource file from hdfs :{}", resHdfsPath);
                     HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
                 } catch (Exception e) {
-                    logger.error(e.getMessage(),e);
+                    logger.error(e.getMessage(), e);
                     throw new RuntimeException(e.getMessage());
                 }
             } else {
@@ -329,6 +335,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
 
     /**
      * get current TaskExecutionContext
+     *
      * @return TaskExecutionContext
      */
     public TaskExecutionContext getTaskExecutionContext() {

+ 31 - 0
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/exception/PluginNotFoundException.java

@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.spi.exception;
+
+public class PluginNotFoundException extends RuntimeException {
+
+    private static final long serialVersionUID = -5487812425126112159L;
+
+    public PluginNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public PluginNotFoundException(String message) {
+        super(message);
+    }
+}

+ 13 - 0
dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java

@@ -61,6 +61,8 @@ public class StandaloneServer {
 
         startAlertServer();
 
+        setTaskPlugin();
+
         new SpringApplicationBuilder(
                 ApiApplicationServer.class,
                 MasterServer.class,
@@ -114,4 +116,15 @@ public class StandaloneServer {
         final ScriptRunner runner = new ScriptRunner(ds.getConnection(), true, true);
         runner.runScript(new FileReader("sql/dolphinscheduler_h2.sql"));
     }
+
+    private static void setTaskPlugin() {
+        final Path taskPluginPath = Paths.get(
+                StandaloneServer.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
+                "../../../dolphinscheduler-task-plugin/dolphinscheduler-task-shell/pom.xml"
+        ).toAbsolutePath();
+        if (Files.exists(taskPluginPath)) {
+            System.setProperty("task.plugin.binding", taskPluginPath.toString());
+            System.setProperty("task.plugin.dir", "");
+        }
+    }
 }