Prechádzať zdrojové kódy

[Feature-3985][Datax] Datax supports setting up running memory (#3986)

* Datax supports setting up running memory

* Datax supports setting up running memory

* Datax supports setting up running memory

* When running a task, the resource file is lost, which results in an error

* add unit test

* add unit test

* add unit test

* add test unit

* add test unit

* add test unit

* fix code smell

* add test unit

* add test unit
BoYiZhang 4 rokov pred
rodič
commit
fe3026627f

+ 47 - 18
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java

@@ -14,15 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.common.task.datax;
 
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.dolphinscheduler.common.task.datax;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * DataX parameter
@@ -89,6 +90,16 @@ public class DataxParameters extends AbstractParameters {
      */
     private int jobSpeedRecord;
 
+    /**
+     * Xms memory
+     */
+    private int xms;
+
+    /**
+     * Xmx memory
+     */
+    private int xmx;
+
     public int getCustomConfig() {
         return customConfig;
     }
@@ -185,6 +196,22 @@ public class DataxParameters extends AbstractParameters {
         this.jobSpeedRecord = jobSpeedRecord;
     }
 
+    public int getXms() {
+        return xms;
+    }
+
+    public void setXms(int xms) {
+        this.xms = xms;
+    }
+
+    public int getXmx() {
+        return xmx;
+    }
+
+    public void setXmx(int xmx) {
+        this.xmx = xmx;
+    }
+
     @Override
     public boolean checkParameters() {
         if (customConfig == Flag.NO.ordinal()) {
@@ -204,19 +231,21 @@ public class DataxParameters extends AbstractParameters {
 
     @Override
     public String toString() {
-        return "DataxParameters{" +
-                "customConfig=" + customConfig +
-                ", json='" + json + '\'' +
-                ", dsType='" + dsType + '\'' +
-                ", dataSource=" + dataSource +
-                ", dtType='" + dtType + '\'' +
-                ", dataTarget=" + dataTarget +
-                ", sql='" + sql + '\'' +
-                ", targetTable='" + targetTable + '\'' +
-                ", preStatements=" + preStatements +
-                ", postStatements=" + postStatements +
-                ", jobSpeedByte=" + jobSpeedByte +
-                ", jobSpeedRecord=" + jobSpeedRecord +
-                '}';
+        return "DataxParameters{"
+                + "customConfig=" + customConfig
+                + ", json='" + json + '\''
+                + ", dsType='" + dsType + '\''
+                + ", dataSource=" + dataSource
+                + ", dtType='" + dtType + '\''
+                + ", dataTarget=" + dataTarget
+                + ", sql='" + sql + '\''
+                + ", targetTable='" + targetTable + '\''
+                + ", preStatements=" + preStatements
+                + ", postStatements=" + postStatements
+                + ", jobSpeedByte=" + jobSpeedByte
+                + ", jobSpeedRecord=" + jobSpeedRecord
+                + ", xms=" + xms
+                + ", xmx=" + xmx
+                + '}';
     }
 }

+ 93 - 0
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/DataxParametersTest.java

@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.task;
+
+import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DataxParametersTest {
+
+    /**
+     * jvm parameters
+     */
+    public static final String JVM_EVN = " --jvm=\"-Xms%sG -Xmx%sG\" ";
+
+    @Test
+    public void testLoadJvmEnv()   {
+
+        DataxParameters dataxParameters = new DataxParameters();
+        dataxParameters.setXms(0);
+        dataxParameters.setXmx(-100);
+
+        String actual =  loadJvmEnvTest(dataxParameters);
+
+        String except = " --jvm=\"-Xms1G -Xmx1G\" ";
+        Assert.assertEquals(except,actual);
+
+        dataxParameters.setXms(13);
+        dataxParameters.setXmx(14);
+        actual =  loadJvmEnvTest(dataxParameters);
+        except = " --jvm=\"-Xms13G -Xmx14G\" ";
+        Assert.assertEquals(except,actual);
+
+    }
+
+    @Test
+    public void testToString()   {
+
+        DataxParameters dataxParameters = new DataxParameters();
+        dataxParameters.setCustomConfig(0);
+        dataxParameters.setXms(0);
+        dataxParameters.setXmx(-100);
+        dataxParameters.setDataSource(1);
+        dataxParameters.setDataTarget(1);
+        dataxParameters.setDsType("MYSQL");
+        dataxParameters.setDtType("MYSQL");
+        dataxParameters.setJobSpeedByte(1);
+        dataxParameters.setJobSpeedRecord(1);
+        dataxParameters.setJson("json");
+
+        String expected = "DataxParameters"
+                + "{"
+                + "customConfig=0, "
+                + "json='json', "
+                + "dsType='MYSQL', "
+                + "dataSource=1, "
+                + "dtType='MYSQL', "
+                + "dataTarget=1, "
+                + "sql='null', "
+                + "targetTable='null', "
+                + "preStatements=null, "
+                + "postStatements=null, "
+                + "jobSpeedByte=1, "
+                + "jobSpeedRecord=1, "
+                + "xms=0, "
+                + "xmx=-100"
+                + "}";
+
+        Assert.assertEquals(expected,dataxParameters.toString());
+    }
+
+    public String loadJvmEnvTest(DataxParameters dataXParameters) {
+        int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms();
+        int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx();
+        return String.format(JVM_EVN, xms, xmx);
+    }
+}

+ 62 - 54
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java

@@ -14,27 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.server.worker.task.datax;
 
+package org.apache.dolphinscheduler.server.worker.task.datax;
 
-import com.alibaba.druid.sql.ast.SQLStatement;
-import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
-import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
-import com.alibaba.druid.sql.ast.statement.*;
-import com.alibaba.druid.sql.parser.SQLStatementParser;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.io.FileUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DbType;
 import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
@@ -46,7 +37,8 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
-import org.slf4j.Logger;
+
+import org.apache.commons.io.FileUtils;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
@@ -56,25 +48,48 @@ import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.FileAttribute;
 import java.nio.file.attribute.PosixFilePermission;
 import java.nio.file.attribute.PosixFilePermissions;
-import java.sql.*;
-import java.util.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import org.slf4j.Logger;
+
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
+import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
+import com.alibaba.druid.sql.ast.statement.SQLSelect;
+import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
+import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
+import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
+import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
+import com.alibaba.druid.sql.parser.SQLStatementParser;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * DataX task
  */
 public class DataxTask extends AbstractTask {
 
+    /**
+     * jvm parameters
+     */
+    public static final String JVM_EVN = " --jvm=\"-Xms%sG -Xmx%sG\" ";
     /**
      * python process(datax only supports version 2.7 by default)
      */
     private static final String DATAX_PYTHON = "python2.7";
-
     /**
      * datax home path
      */
     private static final String DATAX_HOME_EVN = "${DATAX_HOME}";
-
     /**
      * datax channel count
      */
@@ -97,6 +112,7 @@ public class DataxTask extends AbstractTask {
 
     /**
      * constructor
+     *
      * @param taskExecutionContext taskExecutionContext
      * @param logger logger
      */
@@ -104,9 +120,8 @@ public class DataxTask extends AbstractTask {
         super(taskExecutionContext, logger);
         this.taskExecutionContext = taskExecutionContext;
 
-
         this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskExecutionContext,logger);
+                taskExecutionContext, logger);
     }
 
     /**
@@ -149,9 +164,7 @@ public class DataxTask extends AbstractTask {
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(commandExecuteResult.getAppIds());
             setProcessId(commandExecuteResult.getProcessId());
-        }
-        catch (Exception e) {
-            logger.error("datax task failure", e);
+        } catch (Exception e) {
             setExitStatusCode(Constants.EXIT_CODE_FAILURE);
             throw e;
         }
@@ -189,9 +202,9 @@ public class DataxTask extends AbstractTask {
             return fileName;
         }
 
-        if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()){
+        if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()) {
             json = dataXParameters.getJson().replaceAll("\\r\\n", "\n");
-        }else {
+        } else {
             ObjectNode job = JSONUtils.createObjectNode();
             job.putArray("content").addAll(buildDataxJobContentJson());
             job.set("setting", buildDataxJobSettingJson());
@@ -248,7 +261,6 @@ public class DataxTask extends AbstractTask {
         readerParam.put("password", dataSourceCfg.getPassword());
         readerParam.putArray("connection").addAll(readerConnArr);
 
-
         ObjectNode reader = JSONUtils.createObjectNode();
         reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype())));
         reader.set("parameter", readerParam);
@@ -277,7 +289,6 @@ public class DataxTask extends AbstractTask {
         }
         writerParam.putArray("connection").addAll(writerConnArr);
 
-
         if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) {
             ArrayNode preSqlArr = writerParam.putArray("preSql");
             for (String preSql : dataXParameters.getPreStatements()) {
@@ -368,7 +379,7 @@ public class DataxTask extends AbstractTask {
      * @throws Exception if error throws Exception
      */
     private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap)
-        throws Exception {
+            throws Exception {
         // generate scripts
         String fileName = String.format("%s/%s_node.%s",
                 taskExecutionContext.getExecutePath(),
@@ -387,6 +398,7 @@ public class DataxTask extends AbstractTask {
         sbr.append(" ");
         sbr.append(DATAX_HOME_EVN);
         sbr.append(" ");
+        sbr.append(loadJvmEnv(dataXParameters));
         sbr.append(jobConfigFilePath);
 
         // replace placeholder
@@ -409,17 +421,19 @@ public class DataxTask extends AbstractTask {
         return fileName;
     }
 
+    public String loadJvmEnv(DataxParameters dataXParameters) {
+        int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms();
+        int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx();
+        return String.format(JVM_EVN, xms, xmx);
+    }
+
     /**
      * parsing synchronized column names in SQL statements
      *
-     * @param dsType
-     *            the database type of the data source
-     * @param dtType
-     *            the database type of the data target
-     * @param dataSourceCfg
-     *            the database connection parameters of the data source
-     * @param sql
-     *            sql for data synchronization
+     * @param dsType the database type of the data source
+     * @param dtType the database type of the data target
+     * @param dataSourceCfg the database connection parameters of the data source
+     * @param sql sql for data synchronization
      * @return Keyword converted column names
      */
     private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource dataSourceCfg, String sql) {
@@ -438,10 +452,8 @@ public class DataxTask extends AbstractTask {
     /**
      * try grammatical parsing column
      *
-     * @param dbType
-     *            database type
-     * @param sql
-     *            sql for data synchronization
+     * @param dbType database type
+     * @param sql sql for data synchronization
      * @return column name array
      * @throws RuntimeException if error throws RuntimeException
      */
@@ -453,16 +465,16 @@ public class DataxTask extends AbstractTask {
             notNull(parser, String.format("database driver [%s] is not support", dbType.toString()));
 
             SQLStatement sqlStatement = parser.parseStatement();
-            SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement;
+            SQLSelectStatement sqlSelectStatement = (SQLSelectStatement) sqlStatement;
             SQLSelect sqlSelect = sqlSelectStatement.getSelect();
 
             List<SQLSelectItem> selectItemList = null;
             if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) {
-                SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery();
+                SQLSelectQueryBlock block = (SQLSelectQueryBlock) sqlSelect.getQuery();
                 selectItemList = block.getSelectList();
             } else if (sqlSelect.getQuery() instanceof SQLUnionQuery) {
-                SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery();
-                SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight();
+                SQLUnionQuery unionQuery = (SQLUnionQuery) sqlSelect.getQuery();
+                SQLSelectQueryBlock block = (SQLSelectQueryBlock) unionQuery.getRight();
                 selectItemList = block.getSelectList();
             }
 
@@ -470,7 +482,7 @@ public class DataxTask extends AbstractTask {
                     String.format("select query type [%s] is not support", sqlSelect.getQuery().toString()));
 
             columnNames = new String[selectItemList.size()];
-            for (int i = 0; i < selectItemList.size(); i++ ) {
+            for (int i = 0; i < selectItemList.size(); i++) {
                 SQLSelectItem item = selectItemList.get(i);
 
                 String columnName = null;
@@ -479,10 +491,10 @@ public class DataxTask extends AbstractTask {
                     columnName = item.getAlias();
                 } else if (item.getExpr() != null) {
                     if (item.getExpr() instanceof SQLPropertyExpr) {
-                        SQLPropertyExpr expr = (SQLPropertyExpr)item.getExpr();
+                        SQLPropertyExpr expr = (SQLPropertyExpr) item.getExpr();
                         columnName = expr.getName();
                     } else if (item.getExpr() instanceof SQLIdentifierExpr) {
-                        SQLIdentifierExpr expr = (SQLIdentifierExpr)item.getExpr();
+                        SQLIdentifierExpr expr = (SQLIdentifierExpr) item.getExpr();
                         columnName = expr.getName();
                     }
                 } else {
@@ -497,8 +509,7 @@ public class DataxTask extends AbstractTask {
 
                 columnNames[i] = columnName;
             }
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             logger.warn(e.getMessage(), e);
             return null;
         }
@@ -509,10 +520,8 @@ public class DataxTask extends AbstractTask {
     /**
      * try to execute sql to resolve column names
      *
-     * @param baseDataSource
-     *            the database connection parameters
-     * @param sql
-     *            sql for data synchronization
+     * @param baseDataSource the database connection parameters
+     * @param sql sql for data synchronization
      * @return column name array
      */
     public String[] tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, String sql) {
@@ -529,11 +538,10 @@ public class DataxTask extends AbstractTask {
             ResultSetMetaData md = resultSet.getMetaData();
             int num = md.getColumnCount();
             columnNames = new String[num];
-            for (int i = 1; i <= num; i++ ) {
+            for (int i = 1; i <= num; i++) {
                 columnNames[i - 1] = md.getColumnName(i);
             }
-        }
-        catch (SQLException e) {
+        } catch (SQLException e) {
             logger.warn(e.getMessage(), e);
             return null;
         }

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 70 - 18
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java


+ 35 - 4
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue

@@ -144,6 +144,22 @@
         </div>
       </m-list-box>
     </div>
+    <div class="clearfix list">
+      <div class="text-box">
+        <span>{{$t('Running Memory')}}</span>
+      </div>
+      <div class="cont-box">
+        <span >{{$t('Min Memory')}}</span>
+        <m-select-input v-model="xms" :list="[1,2,3,4]">
+        </m-select-input>
+        <span>&nbsp;&nbsp;&nbsp;G &nbsp;&nbsp;</span>
+        <span >{{$t('Max Memory')}}</span>
+        <m-select-input v-model="xmx" :list="[1,2,3,4]">
+        </m-select-input>
+        <span>&nbsp;&nbsp;&nbsp;G</span>
+      </div>
+    </div>
+
   </div>
 </template>
 <script>
@@ -196,6 +212,10 @@
         // Custom parameter
         localParams: [],
         customConfig: 0,
+        //jvm memory xms
+        xms: 1,
+        //jvm memory xms
+        xmx: 1,
       }
     },
     mixins: [disabledState],
@@ -324,7 +344,9 @@
           this.$emit('on-params', {
             customConfig: this.customConfig,
             json: jsonEditor.getValue(),
-            localParams: this.localParams
+            localParams: this.localParams,
+            xms:+this.xms,
+            xmx:+this.xmx
           })
           return true
         } else {
@@ -358,6 +380,7 @@
             return false
           }
 
+          debugger
           // storage
           this.$emit('on-params', {
             customConfig: this.customConfig,
@@ -370,7 +393,9 @@
             jobSpeedByte: this.jobSpeedByte * 1024,
             jobSpeedRecord: this.jobSpeedRecord,
             preStatements: this.preStatements,
-            postStatements: this.postStatements
+            postStatements: this.postStatements,
+            xms:+this.xms,
+            xmx:+this.xmx
           })
           return true
         }
@@ -445,7 +470,9 @@
           jobSpeedByte: this.jobSpeedByte * 1024,
           jobSpeedRecord: this.jobSpeedRecord,
           preStatements: this.preStatements,
-          postStatements: this.postStatements
+          postStatements: this.postStatements,
+          xms: +this.xms,
+          xmx: +this.xmx,
         });
       },
       _destroyEditor () {
@@ -468,6 +495,10 @@
 
       // Non-null objects represent backfill
       if (!_.isEmpty(o)) {
+
+        // set jvm memory
+        this.xms = o.params.xms || 1 ;
+        this.xmx = o.params.xmx || 1 ;
         // backfill
         if(o.params.customConfig == 0) {
           this.customConfig = 0
@@ -544,4 +575,4 @@
     right: -12px;
     top: -16px;
   }
-</style>
+</style>

+ 3 - 0
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@@ -661,5 +661,8 @@ export default {
   'Batch move': 'Batch move',
   Version: 'Version',
   'Pre tasks': 'Pre tasks',
+  'Running Memory':'Running Memory',
+  'Max Memory':'Max Memory',
+  'Min Memory':'Min Memory',
   'The workflow canvas is abnormal and cannot be saved, please recreate': 'The workflow canvas is abnormal and cannot be saved, please recreate'
 }

+ 3 - 0
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@@ -657,5 +657,8 @@ export default {
   'Batch move': '批量移动',
   Version: '版本',
   'Pre tasks': '前置任务',
+  'Running Memory':'运行内存',
+  'Max Memory':'最大内存',
+  'Min Memory':'最小内存',
   'The workflow canvas is abnormal and cannot be saved, please recreate': '该工作流画布异常,无法保存,请重新创建'
 }

+ 1 - 0
pom.xml

@@ -758,6 +758,7 @@
                         <include>**/common/os/OshiTest.java</include>
                         <include>**/common/os/OSUtilsTest.java</include>
                         <include>**/common/shell/ShellExecutorTest.java</include>
+                        <include>**/common/task/DataxParametersTest.java</include>
                         <include>**/common/task/EntityTestUtils.java</include>
                         <include>**/common/task/FlinkParametersTest.java</include>
                         <include>**/common/task/HttpParametersTest.java</include>