Browse Source

Merge pull request #1 from qiaozhanwei/branch-1.0.0

Branch 1.0.0 merge dev
乔占卫 6 years ago
parent
commit
cd60898a61

+ 1 - 1
escheduler-alert/pom.xml

@@ -4,7 +4,7 @@
     <parent>
         <groupId>cn.analysys</groupId>
         <artifactId>escheduler</artifactId>
-        <version>1.0.0-SNAPSHOT</version>
+        <version>1.0.1-SNAPSHOT</version>
     </parent>
     <artifactId>escheduler-alert</artifactId>
     <packaging>jar</packaging>

+ 1 - 1
escheduler-api/pom.xml

@@ -3,7 +3,7 @@
   <parent>
     <groupId>cn.analysys</groupId>
     <artifactId>escheduler</artifactId>
-    <version>1.0.0-SNAPSHOT</version>
+    <version>1.0.1-SNAPSHOT</version>
   </parent>
   <artifactId>escheduler-api</artifactId>
   <packaging>jar</packaging>

+ 1 - 1
escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java

@@ -125,7 +125,7 @@ public class ProcessScheduleJob implements Job {
         }
 
         Command command = new Command();
-        command.setCommandType(CommandType.START_PROCESS);
+        command.setCommandType(CommandType.SCHEDULER);
         command.setExecutorId(schedule.getUserId());
         command.setFailureStrategy(schedule.getFailureStrategy());
         command.setProcessDefinitionId(schedule.getProcessDefinitionId());

+ 1 - 1
escheduler-common/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
 		<artifactId>escheduler</artifactId>
 		<groupId>cn.analysys</groupId>
-		<version>1.0.0-SNAPSHOT</version>
+		<version>1.0.1-SNAPSHOT</version>
 	</parent>
 	<artifactId>escheduler-common</artifactId>
 	<name>escheduler-common</name>

+ 7 - 23
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@@ -70,25 +70,6 @@ public final class Constants {
      */
     public static final String YARN_APPLICATION_STATUS_ADDRESS = "yarn.application.status.address";
 
-    /**
-     * spring.redis.maxIdle
-     */
-    public static final String SPRING_REDIS_MAXIDLE = "spring.redis.maxIdle";
-
-    /**
-     * spring.redis.maxTotal
-     */
-    public static final String SPRING_REDIS_MAXTOTAL = "spring.redis.maxTotal";
-
-    /**
-     * spring.redis.host
-     */
-    public static final String SPRING_REDIS_HOST = "spring.redis.host";
-
-    /**
-     * spring.redis.port
-     */
-    public static final String SPRING_REDIS_PORT = "spring.redis.port";
 
     /**
      * hdfs configuration
@@ -117,9 +98,14 @@ public final class Constants {
     public static final String ESCHEDULER_ENV_PATH = "escheduler.env.path";
 
     /**
-     * escheduler.env.py
+     * escheduler.env.sh
      */
-    public static final String ESCHEDULER_ENV_PY = "escheduler.env.py";
+    public static final String ESCHEDULER_ENV_SH = ".escheduler_env.sh";
+
+    /**
+     * python home
+     */
+    public static final String PYTHON_HOME="PYTHON_HOME";
 
     /**
      * resource.view.suffixs
@@ -255,8 +241,6 @@ public final class Constants {
 
     public static final String SCHEDULER_QUEUE_IMPL = "escheduler.queue.impl";
 
-    public static final String SCHEDULER_QUEUE_REDIS_IMPL = "redis";
-
 
     /**
      * date format of yyyy-MM-dd HH:mm:ss

+ 0 - 7
escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java

@@ -46,13 +46,6 @@ public class CommonUtils {
     return envPath;
   }
 
-  /**
-   * @return get the path of Python system environment variables
-   */
-  public static String getPythonSystemEnvPath() {
-    return getString(ESCHEDULER_ENV_PY);
-  }
-
   /**
    * @return get queue implementation name
    */

+ 0 - 1
escheduler-common/src/main/resources/common/common.properties

@@ -18,7 +18,6 @@ hdfs.startup.state=true
 
 # system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
 escheduler.env.path=/opt/.escheduler_env.sh
-escheduler.env.py=/opt/escheduler_env.py
 
 #resource.view.suffixs
 resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml

+ 1 - 1
escheduler-dao/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
 		<groupId>cn.analysys</groupId>
 		<artifactId>escheduler</artifactId>
-		<version>1.0.0-SNAPSHOT</version>
+		<version>1.0.1-SNAPSHOT</version>
 	</parent>
 	<artifactId>escheduler-dao</artifactId>
 	<name>escheduler-dao</name>

+ 1 - 1
escheduler-rpc/pom.xml

@@ -4,7 +4,7 @@
     <parent>
         <artifactId>escheduler</artifactId>
         <groupId>cn.analysys</groupId>
-        <version>1.0.0-SNAPSHOT</version>
+        <version>1.0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 1 - 1
escheduler-server/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<artifactId>escheduler</artifactId>
 		<groupId>cn.analysys</groupId>
-		<version>1.0.0-SNAPSHOT</version>
+		<version>1.0.1-SNAPSHOT</version>
 	</parent>
 	<artifactId>escheduler-server</artifactId>
 	<name>escheduler-server</name>

+ 1 - 1
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@@ -172,7 +172,7 @@ public class FetchTaskThread implements Runnable{
                             FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
                                     processInstance.getTenantCode(), logger);
 
-
+                            logger.info("task : {} ready to submit to task scheduler thread",taskId);
                             // submit task
                             workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
                         }

+ 55 - 18
escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java

@@ -16,12 +16,13 @@
  */
 package cn.escheduler.server.worker.task;
 
+import cn.escheduler.common.Constants;
 import cn.escheduler.common.utils.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -34,6 +35,8 @@ import java.util.function.Consumer;
  */
 public class PythonCommandExecutor extends AbstractCommandExecutor {
 
+    private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutor.class);
+
     public static final String PYTHON = "python";
 
 
@@ -63,27 +66,13 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
      */
     @Override
     protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
-        logger.info("proxy user:{}, work dir:{}", tenantCode, taskDir);
+        logger.info("tenant :{}, work dir:{}", tenantCode, taskDir);
 
         if (!Files.exists(Paths.get(commandFile))) {
             logger.info("generate command file:{}", commandFile);
 
             StringBuilder sb = new StringBuilder(200);
             sb.append("#-*- encoding=utf8 -*-\n");
-            sb.append("import os,sys\n");
-            sb.append("BASEDIR = os.path.dirname(os.path.realpath(__file__))\n");
-            sb.append("os.chdir(BASEDIR)\n");
-
-            if (StringUtils.isNotEmpty(envFile)) {
-                String[] envArray = envFile.split("\\.");
-                if(envArray.length == 2){
-                    String path = envArray[0];
-                    logger.info("path:"+path);
-                    int index =  path.lastIndexOf("/");
-                    sb.append(String.format("sys.path.append('%s')\n",path.substring(0,index)));
-                    sb.append(String.format("import %s\n",path.substring(index+1)));
-                }
-            }
 
             sb.append("\n\n");
             sb.append(String.format("import py_%s_node\n",taskAppId));
@@ -96,7 +85,14 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
 
     @Override
     protected String commandType() {
-        return PYTHON;
+
+        String envPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH + "conf"+
+                Constants.SINGLE_SLASH +"env" + Constants.SINGLE_SLASH + Constants.ESCHEDULER_ENV_SH;
+        String pythonHome = getPythonHome(envPath);
+        if (StringUtils.isEmpty(pythonHome)){
+            return PYTHON;
+        }
+        return pythonHome;
     }
 
     @Override
@@ -109,4 +105,45 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
         return true;
     }
 
+
+    /**
+     *  get python home
+     * @param envPath
+     * @return
+     */
+    private static String getPythonHome(String envPath){
+        BufferedReader br = null;
+        String line = null;
+        StringBuilder sb = new StringBuilder();
+        try {
+            br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));
+            while ((line = br.readLine()) != null){
+                if (line.contains(Constants.PYTHON_HOME)){
+                    sb.append(line);
+                    break;
+                }
+            }
+            String result = sb.toString();
+            if (org.apache.commons.lang.StringUtils.isEmpty(result)){
+                return null;
+            }
+            String[] arrs = result.split("=");
+            if (arrs.length == 2){
+                return arrs[1];
+            }
+
+        }catch (IOException e){
+            logger.error("read file failed : " + e.getMessage(),e);
+        }finally {
+            try {
+                if (br != null){
+                    br.close();
+                }
+            } catch (IOException e) {
+                logger.error(e.getMessage(),e);
+            }
+        }
+        return null;
+    }
+
 }

+ 1 - 1
escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java

@@ -72,7 +72,7 @@ public class PythonTask extends AbstractTask {
 
     this.pythonProcessTask = new PythonCommandExecutor(this::logHandle,
             taskProps.getTaskDir(), taskProps.getTaskAppId(),
-            taskProps.getTenantCode(), CommonUtils.getPythonSystemEnvPath(), taskProps.getTaskStartTime(),
+            taskProps.getTenantCode(), null, taskProps.getTaskStartTime(),
             taskProps.getTaskTimeout(), logger);
     this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
   }

+ 65 - 0
escheduler-server/src/test/java/cn/escheduler/server/worker/EnvFileTest.java

@@ -0,0 +1,65 @@
+package cn.escheduler.server.worker;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * Created by qiaozhanwei on 2019/4/15.
+ */
+public class EnvFileTest {
+
+    private static  final Logger logger = LoggerFactory.getLogger(EnvFileTest.class);
+
+    public static void main(String[] args) {
+        String path = System.getProperty("user.dir")+"\\script\\env\\.escheduler_env.sh";
+        String pythonHome = getPythonHome(path);
+        logger.info(pythonHome);
+
+    }
+
+    /**
+     *  get python home
+     * @param path
+     * @return
+     */
+    private static String getPythonHome(String path){
+        BufferedReader br = null;
+        String line = null;
+        StringBuilder sb = new StringBuilder();
+        try {
+            br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
+            while ((line = br.readLine()) != null){
+                if (line.contains("PYTHON_HOME")){
+                    sb.append(line);
+                    break;
+                }
+            }
+            String result = sb.toString();
+            if (StringUtils.isEmpty(result)){
+                return null;
+            }
+            String[] arrs = result.split("=");
+            if (arrs.length == 2){
+                return arrs[1];
+            }
+
+        }catch (IOException e){
+            logger.error("read file failed : " + e.getMessage(),e);
+        }finally {
+            try {
+                if (br != null){
+                    br.close();
+                }
+            } catch (IOException e) {
+                logger.error(e.getMessage(),e);
+            }
+        }
+        return null;
+    }
+}

+ 1 - 1
pom.xml

@@ -3,7 +3,7 @@
 	<modelVersion>4.0.0</modelVersion>
 	<groupId>cn.analysys</groupId>
 	<artifactId>escheduler</artifactId>
-	<version>1.0.0-SNAPSHOT</version>
+	<version>1.0.1-SNAPSHOT</version>
 	<packaging>pom</packaging>
 	<name>escheduler</name>
 	<url>http://maven.apache.org</url>

+ 0 - 12
script/env/escheduler_env.py

@@ -1,12 +0,0 @@
-import os
-
-HADOOP_HOME="/opt/soft/hadoop"
-SPARK_HOME1="/opt/soft/spark1"
-SPARK_HOME2="/opt/soft/spark2"
-PYTHON_HOME="/opt/soft/python"
-JAVA_HOME="/opt/soft/java"
-HIVE_HOME="/opt/soft/hive"
-PATH=os.environ['PATH']
-PATH="%s/bin:%s/bin:%s/bin:%s/bin:%s/bin:%s/bin:%s"%(HIVE_HOME,HADOOP_HOME,SPARK_HOME1,SPARK_HOME2,JAVA_HOME,PYTHON_HOME,PATH)
-
-os.putenv('PATH','%s'%PATH)