|
@@ -17,9 +17,6 @@
|
|
|
|
|
|
package org.apache.dolphinscheduler.plugin.task.sql;
|
|
|
|
|
|
-import com.fasterxml.jackson.databind.node.ArrayNode;
|
|
|
-import com.fasterxml.jackson.databind.node.ObjectNode;
|
|
|
-import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
|
|
|
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
|
|
@@ -38,7 +35,8 @@ import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
|
|
|
import org.apache.dolphinscheduler.spi.enums.DbType;
|
|
|
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
|
|
|
import org.apache.dolphinscheduler.spi.utils.StringUtils;
|
|
|
-import org.slf4j.Logger;
|
|
|
+
|
|
|
+import org.apache.commons.collections4.CollectionUtils;
|
|
|
|
|
|
import java.sql.Connection;
|
|
|
import java.sql.PreparedStatement;
|
|
@@ -48,6 +46,7 @@ import java.sql.SQLException;
|
|
|
import java.sql.Statement;
|
|
|
import java.text.MessageFormat;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -56,6 +55,11 @@ import java.util.regex.Matcher;
|
|
|
import java.util.regex.Pattern;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
+import org.slf4j.Logger;
|
|
|
+
|
|
|
+import com.fasterxml.jackson.databind.node.ArrayNode;
|
|
|
+import com.fasterxml.jackson.databind.node.ObjectNode;
|
|
|
+
|
|
|
public class SqlTask extends AbstractTask {
|
|
|
|
|
|
|
|
@@ -88,6 +92,8 @@ public class SqlTask extends AbstractTask {
|
|
|
|
|
|
public static final int TEST_FLAG_YES = 1;
|
|
|
|
|
|
+ private static final String SQL_SEPARATOR = ";\n";
|
|
|
+
|
|
|
|
|
|
* Abstract Yarn Task
|
|
|
*
|
|
@@ -127,15 +133,18 @@ public class SqlTask extends AbstractTask {
|
|
|
sqlParameters.getConnParams(),
|
|
|
sqlParameters.getVarPool(),
|
|
|
sqlParameters.getLimit());
|
|
|
+ String separator = SQL_SEPARATOR;
|
|
|
try {
|
|
|
|
|
|
|
|
|
baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(
|
|
|
DbType.valueOf(sqlParameters.getType()),
|
|
|
sqlTaskExecutionContext.getConnectionParams());
|
|
|
-
|
|
|
+ if (DbType.valueOf(sqlParameters.getType()).isSupportMultipleStatement()) {
|
|
|
+ separator = "";
|
|
|
+ }
|
|
|
|
|
|
- List<SqlBinds> mainStatementSqlBinds = SqlSplitter.split(sqlParameters.getSql(), sqlParameters.getSegmentSeparator())
|
|
|
+ List<SqlBinds> mainStatementSqlBinds = split(sqlParameters.getSql(), separator)
|
|
|
.stream()
|
|
|
.map(this::getSqlAndSqlParamsMap)
|
|
|
.collect(Collectors.toList());
|
|
@@ -170,6 +179,31 @@ public class SqlTask extends AbstractTask {
|
|
|
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ * split sql by segment separator
|
|
|
+ * <p>The segment separator is used
|
|
|
+ * when the data source does not support multi-segment SQL execution,
|
|
|
+ * and the client needs to split the SQL and execute it multiple times.</p>
|
|
|
+ * @param sql
|
|
|
+ * @param segmentSeparator
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public static List<String> split(String sql, String segmentSeparator) {
|
|
|
+ if (StringUtils.isEmpty(segmentSeparator)) {
|
|
|
+ return Collections.singletonList(sql);
|
|
|
+ }
|
|
|
+
|
|
|
+ String[] lines = sql.split(segmentSeparator);
|
|
|
+ List<String> segments = new ArrayList<>();
|
|
|
+ for (String line : lines) {
|
|
|
+ if (line.trim().isEmpty() || line.startsWith("--")) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ segments.add(line);
|
|
|
+ }
|
|
|
+ return segments;
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
* execute function and sql
|
|
|
*
|