Browse Source

[Feature-9177][Task] The sql task supports configuring segmentation notation to provide execution of multiple statements (#9917)

* Support sql segmentation to execute multiple functions.
Kerwin 3 years ago
parent
commit
0e7c98d465

+ 18 - 0
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java

@@ -113,6 +113,15 @@ public class SqlParameters extends AbstractParameters {
 
     private int limit;
 
+    /**
+     * segment separator
+     *
+     * <p>The segment separator is used
+     * when the data source does not support multi-segment SQL execution,
+     * and the client needs to split the SQL and execute it multiple times.</p>
+     */
+    private String segmentSeparator;
+
     public int getLimit() {
         return limit;
     }
@@ -225,6 +234,14 @@ public class SqlParameters extends AbstractParameters {
         this.groupId = groupId;
     }
 
+    public String getSegmentSeparator() {
+        return segmentSeparator;
+    }
+
+    public void setSegmentSeparator(String segmentSeparator) {
+        this.segmentSeparator = segmentSeparator;
+    }
+
     @Override
     public boolean checkParameters() {
         return datasource != 0 && StringUtils.isNotEmpty(type) && StringUtils.isNotEmpty(sql);
@@ -292,6 +309,7 @@ public class SqlParameters extends AbstractParameters {
                 + ", sendEmail=" + sendEmail
                 + ", displayRows=" + displayRows
                 + ", limit=" + limit
+                + ", segmentSeparator=" + segmentSeparator
                 + ", udfs='" + udfs + '\''
                 + ", showType='" + showType + '\''
                 + ", connParams='" + connParams + '\''

+ 65 - 0
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlSplitter.java

@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sql;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class SqlSplitter {
+
+    private SqlSplitter() {
+    }
+
+    private static final String LINE_SEPARATOR = "\n";
+
+    /**
+     * split sql by segment separator
+     * <p>The segment separator is used
+     * when the data source does not support multi-segment SQL execution,
+     * and the client needs to split the SQL and execute it multiple times.</p>
+     * @param sql
+     * @param segmentSeparator
+     * @return
+     */
+    public static List<String> split(String sql, String segmentSeparator) {
+        if (StringUtils.isBlank(segmentSeparator)) {
+            return Collections.singletonList(sql);
+        }
+
+        String[] lines = sql.split(LINE_SEPARATOR);
+        List<String> segments = new ArrayList<>();
+        StringBuilder stmt = new StringBuilder();
+        for (String line : lines) {
+            if (line.trim().isEmpty() || line.startsWith("--")) {
+                continue;
+            }
+            stmt.append(LINE_SEPARATOR).append(line);
+            if (line.trim().endsWith(segmentSeparator)) {
+                segments.add(stmt.toString());
+                stmt.setLength(0);
+            }
+        }
+        if (stmt.length() > 0) {
+            segments.add(stmt.toString());
+        }
+        return segments;
+    }
+}

+ 29 - 51
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@@ -136,7 +136,11 @@ public class SqlTask extends AbstractTaskExecutor {
                     sqlTaskExecutionContext.getConnectionParams());
 
             // ready to execute SQL and parameter entity Map
-            SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
+            List<SqlBinds> mainStatementSqlBinds = SqlSplitter.split(sqlParameters.getSql(), sqlParameters.getSegmentSeparator())
+                    .stream()
+                    .map(this::getSqlAndSqlParamsMap)
+                    .collect(Collectors.toList());
+
             List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
                     .orElse(new ArrayList<>())
                     .stream()
@@ -151,7 +155,7 @@ public class SqlTask extends AbstractTaskExecutor {
             List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList(), logger);
 
             // execute sql task
-            executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
+            executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
 
             setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
 
@@ -165,17 +169,16 @@ public class SqlTask extends AbstractTaskExecutor {
     /**
      * execute function and sql
      *
-     * @param mainSqlBinds main sql binds
+     * @param mainStatementsBinds main statements binds
      * @param preStatementsBinds pre statements binds
      * @param postStatementsBinds post statements binds
      * @param createFuncs create functions
      */
-    public void executeFuncAndSql(SqlBinds mainSqlBinds,
+    public void executeFuncAndSql(List<SqlBinds> mainStatementsBinds,
                                   List<SqlBinds> preStatementsBinds,
                                   List<SqlBinds> postStatementsBinds,
                                   List<String> createFuncs) throws Exception {
         Connection connection = null;
-        PreparedStatement stmt = null;
         ResultSet resultSet = null;
         try {
 
@@ -186,30 +189,31 @@ public class SqlTask extends AbstractTaskExecutor {
                 createTempFunction(connection, createFuncs);
             }
 
-            // pre sql
-            preSql(connection, preStatementsBinds);
-            stmt = prepareStatementAndBind(connection, mainSqlBinds);
+            // pre execute
+            executeUpdate(connection, preStatementsBinds, "pre");
 
+            // main execute
             String result = null;
             // decide whether to executeQuery or executeUpdate based on sqlType
             if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
                 // query statements need to be convert to JsonArray and inserted into Alert to send
-                resultSet = stmt.executeQuery();
+                resultSet = executeQuery(connection, mainStatementsBinds.get(0), "main");
                 result = resultProcess(resultSet);
-
             } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
                 // non query statement
-                String updateResult = String.valueOf(stmt.executeUpdate());
+                String updateResult = executeUpdate(connection, mainStatementsBinds, "main");
                 result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());
             }
             //deal out params
             sqlParameters.dealOutParam(result);
-            postSql(connection, postStatementsBinds);
+
+            // post execute
+            executeUpdate(connection, postStatementsBinds, "post");
         } catch (Exception e) {
             logger.error("execute sql error: {}", e.getMessage());
             throw e;
         } finally {
-            close(resultSet, stmt, connection);
+            close(resultSet, connection);
         }
     }
 
@@ -288,37 +292,22 @@ public class SqlTask extends AbstractTaskExecutor {
         setTaskAlertInfo(taskAlertInfo);
     }
 
-    /**
-     * pre sql
-     *
-     * @param connection connection
-     * @param preStatementsBinds preStatementsBinds
-     */
-    private void preSql(Connection connection,
-                        List<SqlBinds> preStatementsBinds) throws Exception {
-        for (SqlBinds sqlBind : preStatementsBinds) {
-            try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)) {
-                int result = pstmt.executeUpdate();
-                logger.info("pre statement execute result: {}, for sql: {}", result, sqlBind.getSql());
-
-            }
+    private ResultSet executeQuery(Connection connection, SqlBinds sqlBinds, String handlerType) throws Exception {
+        try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBinds)) {
+            logger.info("{} statement execute query, for sql: {}", handlerType, sqlBinds.getSql());
+            return statement.executeQuery();
         }
     }
 
-    /**
-     * post sql
-     *
-     * @param connection connection
-     * @param postStatementsBinds postStatementsBinds
-     */
-    private void postSql(Connection connection,
-                         List<SqlBinds> postStatementsBinds) throws Exception {
-        for (SqlBinds sqlBind : postStatementsBinds) {
-            try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)) {
-                int result = pstmt.executeUpdate();
-                logger.info("post statement execute result: {},for sql: {}", result, sqlBind.getSql());
+    private String executeUpdate(Connection connection, List<SqlBinds> statementsBinds, String handlerType) throws Exception {
+        int result = 0;
+        for (SqlBinds sqlBind : statementsBinds) {
+            try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBind)) {
+                result = statement.executeUpdate();
+                logger.info("{} statement execute update result: {}, for sql: {}", handlerType, result, sqlBind.getSql());
             }
         }
+        return String.valueOf(result);
     }
 
     /**
@@ -341,12 +330,9 @@ public class SqlTask extends AbstractTaskExecutor {
      * close jdbc resource
      *
      * @param resultSet resultSet
-     * @param pstmt pstmt
      * @param connection connection
      */
-    private void close(ResultSet resultSet,
-                       PreparedStatement pstmt,
-                       Connection connection) {
+    private void close(ResultSet resultSet, Connection connection) {
         if (resultSet != null) {
             try {
                 resultSet.close();
@@ -355,14 +341,6 @@ public class SqlTask extends AbstractTaskExecutor {
             }
         }
 
-        if (pstmt != null) {
-            try {
-                pstmt.close();
-            } catch (SQLException e) {
-                logger.error("close prepared statement error : {}", e.getMessage(), e);
-            }
-        }
-
         if (connection != null) {
             try {
                 connection.close();

+ 2 - 0
dolphinscheduler-ui/src/locales/modules/en_US.ts

@@ -924,6 +924,8 @@ const project = {
     required: 'required',
     emr_flow_define_json: 'jobFlowDefineJson',
     emr_flow_define_json_tips: 'Please enter the definition of the job flow.',
+    segment_separator: 'Segment Execution Separator',
+    segment_separator_tips: 'Please enter the segment execution separator',
     zeppelin_note_id: 'zeppelinNoteId',
     zeppelin_note_id_tips: 'Please enter the note id of your zeppelin note',
     zeppelin_paragraph_id: 'zeppelinParagraphId',

+ 2 - 0
dolphinscheduler-ui/src/locales/modules/zh_CN.ts

@@ -913,6 +913,8 @@ const project = {
     required: '必填',
     emr_flow_define_json: 'jobFlowDefineJson',
     emr_flow_define_json_tips: '请输入工作流定义',
+    segment_separator: '分段执行符号',
+    segment_separator_tips: '请输入分段执行符号',
     zeppelin_note_id: 'zeppelin_note_id',
     zeppelin_note_id_tips: '请输入zeppelin note id',
     zeppelin_paragraph_id: 'zeppelin_paragraph_id',

+ 10 - 0
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts

@@ -24,6 +24,7 @@ import type { IJsonItem } from '../types'
 export function useSqlType(model: { [field: string]: any }): IJsonItem[] {
   const { t } = useI18n()
   const querySpan = computed(() => (model.sqlType === '0' ? 6 : 0))
+  const nonQuerySpan = computed(() => (model.sqlType === '1' ? 6 : 0))
   const emailSpan = computed(() =>
     model.sqlType === '0' && model.sendEmail ? 24 : 0
   )
@@ -67,6 +68,15 @@ export function useSqlType(model: { [field: string]: any }): IJsonItem[] {
         required: true
       }
     },
+    {
+      type: 'input',
+      field: 'segmentSeparator',
+      name: t('project.node.segment_separator'),
+      props: {
+        placeholder: t('project.node.segment_separator_tips')
+      },
+      span: nonQuerySpan
+    },
     {
       type: 'switch',
       field: 'sendEmail',

+ 1 - 0
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@@ -179,6 +179,7 @@ export function formatParams(data: INodeData): {
     taskParams.sqlType = data.sqlType
     taskParams.preStatements = data.preStatements
     taskParams.postStatements = data.postStatements
+    taskParams.segmentSeparator = data.segmentSeparator
     taskParams.sendEmail = data.sendEmail
     taskParams.displayRows = data.displayRows
     if (data.sqlType === '0' && data.sendEmail) {

+ 1 - 0
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts

@@ -46,6 +46,7 @@ export function useSql({
     timeout: 30,
     type: 'MYSQL',
     displayRows: 10,
+    segmentSeparator: '',
     sql: '',
     sqlType: '0',
     preStatements: [],

+ 1 - 0
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@@ -241,6 +241,7 @@ interface ITaskParams {
   datasource?: string
   sql?: string
   sqlType?: string
+  segmentSeparator?: string
   sendEmail?: boolean
   displayRows?: number
   title?: string