Browse Source

Set charset in upgrading tools to solve read upgrade sql mojibake (#14955)

Wenjun Ruan 1 year ago
parent
commit
020092ed1e

+ 137 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/sql/ClasspathSqlScriptParser.java

@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.sql;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.Reader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+public class ClasspathSqlScriptParser implements SqlScriptParser {
+
+    private final String sqlScriptPath;
+
+    private final Charset charset;
+
+    public ClasspathSqlScriptParser(String sqlScriptPath) {
+        this.sqlScriptPath = sqlScriptPath;
+        this.charset = StandardCharsets.UTF_8;
+    }
+
+    @Override
+    public List<String> getAllSql() throws IOException {
+        Resource sqlScriptResource = new ClassPathResource(sqlScriptPath);
+        List<String> result = new ArrayList<>();
+        try (
+                InputStream inputStream = sqlScriptResource.getInputStream();
+                Reader sqlScriptReader = new InputStreamReader(inputStream, charset);
+                LineNumberReader lineNumberReader = new LineNumberReader(sqlScriptReader)) {
+            String sql;
+            do {
+                sql = parseNextSql(lineNumberReader);
+                if (StringUtils.isNotBlank(sql)) {
+                    result.add(sql);
+                }
+            } while (StringUtils.isNotBlank(sql));
+        }
+        return result;
+    }
+
+    private String parseNextSql(LineNumberReader lineNumberReader) throws IOException {
+        String line;
+        while ((line = lineNumberReader.readLine()) != null) {
+            String trimLine = line.trim();
+            if (StringUtils.isEmpty(trimLine) || isComment(trimLine)) {
+                // Skip the empty line, comment line
+                continue;
+            }
+            if (trimLine.startsWith("/*")) {
+                skipLicenseHeader(lineNumberReader);
+                continue;
+            }
+            if (trimLine.startsWith("delimiter")) {
+                // begin to parse processor, until delimiter ;
+                String[] split = trimLine.split(" ");
+                return parseProcedure(lineNumberReader, split[1]);
+            }
+            // begin to parse sql until;
+            List<String> sqlLines = new ArrayList<>();
+            sqlLines.add(line);
+            while (!line.endsWith(";")) {
+                line = lineNumberReader.readLine();
+                if (line == null) {
+                    break;
+                }
+                if (StringUtils.isBlank(line)) {
+                    continue;
+                }
+                sqlLines.add(line);
+            }
+            return String.join("\n", sqlLines);
+        }
+        return null;
+    }
+
+    private void skipLicenseHeader(LineNumberReader lineNumberReader) throws IOException {
+        String line;
+        while ((line = lineNumberReader.readLine()) != null) {
+            String trimLine = line.trim();
+            if (StringUtils.isEmpty(trimLine) || isComment(trimLine)) {
+                // Skip the empty line, comment line
+                continue;
+            }
+            if (line.startsWith("*/")) {
+                break;
+            }
+        }
+    }
+
+    private String parseProcedure(LineNumberReader lineNumberReader, String delimiter) throws IOException {
+        List<String> sqlLines = new ArrayList<>();
+        // begin to parse processor, until delimiter ;
+        String line;
+        while (true) {
+            line = lineNumberReader.readLine();
+            if (line == null) {
+                break;
+            }
+            if (StringUtils.isBlank(line)) {
+                continue;
+            }
+            if (line.startsWith(delimiter)) {
+                break;
+            }
+            sqlLines.add(line);
+        }
+        return String.join("\n", sqlLines);
+    }
+
+    private boolean isComment(String line) {
+        return line.startsWith("--") || line.startsWith("//");
+    }
+}

+ 6 - 17
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ScriptRunnerTest.java

@@ -14,25 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.common.utils;
 
-import java.io.StringReader;
+package org.apache.dolphinscheduler.common.sql;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.util.List;
 
-public class ScriptRunnerTest {
+public interface SqlScriptParser {
+
+    List<String> getAllSql() throws IOException;
 
-    @Test
-    public void testRunScript() {
-        // connection is null
-        Exception exception = null;
-        ScriptRunner s = new ScriptRunner(null, true, true);
-        try {
-            s.runScript(new StringReader("select 1"));
-        } catch (Exception e) {
-            exception = e;
-        }
-        Assertions.assertNotNull(exception);
-    }
 }

+ 65 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/sql/SqlScriptRunner.java

@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.sql;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SqlScriptRunner {
+
+    private final DataSource dataSource;
+
+    private final SqlScriptParser sqlScriptParser;
+
+    /**
+     * @param dataSource        DataSource which used to execute the sql script.
+     * @param sqlScriptFilePath Sqk script file path, the path should under classpath.
+     */
+    public SqlScriptRunner(DataSource dataSource, String sqlScriptFilePath) {
+        this.dataSource = dataSource;
+        this.sqlScriptParser = new ClasspathSqlScriptParser(sqlScriptFilePath);
+    }
+
+    public void execute() throws SQLException, IOException {
+        List<String> allSql = sqlScriptParser.getAllSql();
+        try (Connection connection = dataSource.getConnection()) {
+            for (String sql : allSql) {
+                if (StringUtils.isBlank(sql)) {
+                    continue;
+                }
+                try (Statement statement = connection.createStatement()) {
+                    // Since some sql doesn't have result so we believe if there is no exception then we think the sql
+                    // execute success.
+                    statement.execute(sql);
+                    log.info("Execute sql: {} success", sql);
+                }
+            }
+        }
+    }
+
+}

+ 0 - 162
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java

@@ -1,162 +0,0 @@
-/*
- * Copyright 2004-2020 the original author or authors.
- * <p>
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import java.io.IOException;
-import java.io.LineNumberReader;
-import java.io.Reader;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * Tool to run database scripts
- */
-@Slf4j
-public class ScriptRunner {
-
-    private static final String DEFAULT_DELIMITER = ";";
-
-    private final Connection connection;
-
-    private final boolean stopOnError;
-    private final boolean autoCommit;
-
-    private String delimiter = DEFAULT_DELIMITER;
-    private boolean fullLineDelimiter = false;
-
-    public ScriptRunner(Connection connection, boolean autoCommit, boolean stopOnError) {
-        this.connection = connection;
-        this.autoCommit = autoCommit;
-        this.stopOnError = stopOnError;
-    }
-
-    public void setDelimiter(String delimiter, boolean fullLineDelimiter) {
-        this.delimiter = delimiter;
-        this.fullLineDelimiter = fullLineDelimiter;
-    }
-
-    /**
-     * Runs an SQL script (read in using the Reader parameter)
-     *
-     * @param reader - the source of the script
-     * @throws IOException errors
-     * @throws SQLException errors
-     */
-    public void runScript(Reader reader) throws IOException, SQLException {
-        try {
-            boolean originalAutoCommit = connection.getAutoCommit();
-            try {
-                if (originalAutoCommit != this.autoCommit) {
-                    connection.setAutoCommit(this.autoCommit);
-                }
-                runScript(connection, reader);
-            } finally {
-                connection.setAutoCommit(originalAutoCommit);
-            }
-        } catch (IOException | SQLException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new RuntimeException("Error running script.  Cause: " + e, e);
-        }
-    }
-
-    /**
-     * Runs an SQL script (read in using the Reader parameter) using the connection
-     * passed in
-     *
-     * @param conn - the connection to use for the script
-     * @param reader - the source of the script
-     * @throws SQLException if any SQL errors occur
-     * @throws IOException if there is an error reading from the Reader
-     */
-    private void runScript(Connection conn, Reader reader) throws IOException, SQLException {
-        List<String> command = null;
-        try {
-            LineNumberReader lineReader = new LineNumberReader(reader);
-            String line;
-            while ((line = lineReader.readLine()) != null) {
-                if (command == null) {
-                    command = new ArrayList<>();
-                }
-                String trimmedLine = line.trim();
-                if (trimmedLine.startsWith("--")) {
-                    log.info("\n{}", trimmedLine);
-                } else if (trimmedLine.length() < 1 || trimmedLine.startsWith("//")) {
-                    // Do nothing
-                } else if (trimmedLine.startsWith("delimiter")) {
-                    String newDelimiter = trimmedLine.split(" ")[1];
-                    this.setDelimiter(newDelimiter, fullLineDelimiter);
-
-                } else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter())
-                        || fullLineDelimiter && trimmedLine.equals(getDelimiter())) {
-                    command.add(line.substring(0, line.lastIndexOf(getDelimiter())));
-                    log.info("\n{}", String.join("\n", command));
-
-                    try (Statement statement = conn.createStatement()) {
-                        statement.execute(String.join(" ", command));
-                        try (ResultSet rs = statement.getResultSet()) {
-                            if (stopOnError && rs != null) {
-                                ResultSetMetaData md = rs.getMetaData();
-                                int cols = md.getColumnCount();
-                                for (int i = 1; i < cols; i++) {
-                                    String name = md.getColumnLabel(i);
-                                    log.info("{} \t", name);
-                                }
-                                log.info("");
-                                while (rs.next()) {
-                                    for (int i = 1; i < cols; i++) {
-                                        String value = rs.getString(i);
-                                        log.info("{} \t", value);
-                                    }
-                                    log.info("");
-                                }
-                            }
-                        }
-                    } catch (SQLException e) {
-                        log.error("SQLException", e);
-                        throw e;
-                    }
-
-                    command = null;
-                    Thread.yield();
-                } else {
-                    command.add(line);
-                }
-            }
-
-        } catch (SQLException e) {
-            log.error("Error executing: {}", command);
-            throw e;
-        } catch (IOException e) {
-            e.fillInStackTrace();
-            log.error("Error executing: {}", command);
-            throw e;
-        }
-    }
-
-    private String getDelimiter() {
-        return delimiter;
-    }
-
-}

+ 52 - 0
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/sql/ClasspathSqlScriptParserTest.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.sql;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class ClasspathSqlScriptParserTest {
+
+    @Test
+    void testMysqlDmlSql() throws IOException {
+        ClasspathSqlScriptParser classpathSqlScriptParser = new ClasspathSqlScriptParser("sql/mysql_dml.sql");
+        List<String> allSql = classpathSqlScriptParser.getAllSql();
+        Assertions.assertEquals("drop PROCEDURE if EXISTS dolphin_t_ds_tenant_insert_default;", allSql.get(0));
+        Assertions.assertEquals("CREATE PROCEDURE dolphin_t_ds_tenant_insert_default()\n" +
+                "BEGIN\n" +
+                "    IF\n" +
+                "NOT EXISTS(SELECT 1\n" +
+                "                   FROM t_ds_tenant\n" +
+                "                   WHERE id = -1)\n" +
+                "    THEN\n" +
+                "        INSERT INTO `t_ds_tenant` VALUES ('-1', 'default', 'default tenant', '1', current_timestamp, current_timestamp);\n"
+                +
+                "END IF;\n" +
+                "END;", String.join("", allSql.get(1)));
+    }
+
+    @Test
+    void testMysqlDdlSql() throws IOException {
+        ClasspathSqlScriptParser classpathSqlScriptParser = new ClasspathSqlScriptParser("sql/mysql_ddl.sql");
+        List<String> allSql = classpathSqlScriptParser.getAllSql();
+        Assertions.assertEquals("ALTER TABLE t_ds_process_definition DROP tenant_id;", allSql.get(0));
+    }
+}

File diff suppressed because it is too large
+ 44 - 0
dolphinscheduler-common/src/test/resources/sql/mysql_ddl.sql


+ 84 - 0
dolphinscheduler-common/src/test/resources/sql/mysql_dml.sql

@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+-- ############################# t_ds_tenant #############################
+drop PROCEDURE if EXISTS dolphin_t_ds_tenant_insert_default;
+delimiter d//
+CREATE PROCEDURE dolphin_t_ds_tenant_insert_default()
+BEGIN
+    IF
+NOT EXISTS(SELECT 1
+                   FROM t_ds_tenant
+                   WHERE id = -1)
+    THEN
+        INSERT INTO `t_ds_tenant` VALUES ('-1', 'default', 'default tenant', '1', current_timestamp, current_timestamp);
+END IF;
+END;
+d//
+
+delimiter ;
+CALL dolphin_t_ds_tenant_insert_default();
+DROP PROCEDURE dolphin_t_ds_tenant_insert_default;
+
+-- tenant improvement
+UPDATE t_ds_schedules t1 JOIN t_ds_process_definition t2 ON t1.process_definition_code = t2.code LEFT JOIN t_ds_tenant t3 ON t2.tenant_id = t3.id SET t1.tenant_code = COALESCE(t3.tenant_code, 'default');
+UPDATE `t_ds_process_instance` SET `tenant_code` = 'default' WHERE `tenant_code` IS NULL;
+
+-- data quality support choose database
+INSERT IGNORE INTO `t_ds_dq_rule_input_entry`
+(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
+VALUES(30, 'src_database', 'select', '$t(src_database)', NULL, NULL, 'please select source database', 0, 0, 0, 1, 1, 1, 1, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_dq_rule_input_entry`
+(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
+VALUES(31, 'target_database', 'select', '$t(target_database)', NULL, NULL, 'please select target database', 0, 0, 0, 1, 1, 1, 1, current_timestamp, current_timestamp);
+
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(151, 1, 30, NULL, 2, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(152, 2, 30, NULL, 2, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(153, 3, 30, NULL, 2, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(154, 4, 30, NULL, 2, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(155, 5, 30, NULL, 2, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(156, 6, 30, NULL, 2, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(157, 7, 30, NULL, 2, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(158, 8, 30, NULL, 2, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(159, 9, 30, NULL, 2, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(160, 10, 30, NULL, 2, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(161, 3, 31, NULL, 6, current_timestamp, current_timestamp);
+INSERT IGNORE INTO `t_ds_relation_rule_input_entry`
+(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
+VALUES(162, 4, 31, NULL, 7, current_timestamp, current_timestamp);

+ 20 - 34
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java

@@ -17,13 +17,11 @@
 
 package org.apache.dolphinscheduler.tools.datasource.dao;
 
-import org.apache.dolphinscheduler.common.utils.ScriptRunner;
+import org.apache.dolphinscheduler.common.sql.SqlScriptRunner;
 import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import java.io.FileNotFoundException;
-import java.io.InputStreamReader;
-import java.io.Reader;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -33,9 +31,6 @@ import javax.sql.DataSource;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.Resource;
-
 @Slf4j
 public abstract class UpgradeDao {
 
@@ -63,17 +58,13 @@ public abstract class UpgradeDao {
      * @param dbType db type
      */
     private void runInitSql(DbType dbType) {
-        String sqlFile = String.format("dolphinscheduler_%s.sql", dbType.getDescp());
-        Resource mysqlSQLFilePath = new ClassPathResource("sql/" + sqlFile);
-        try (Connection conn = dataSource.getConnection()) {
-            // Execute the dolphinscheduler_ddl.sql script to create the table structure of dolphinscheduler
-            ScriptRunner initScriptRunner = new ScriptRunner(conn, true, true);
-            try (Reader initSqlReader = new InputStreamReader(mysqlSQLFilePath.getInputStream())) {
-                initScriptRunner.runScript(initSqlReader);
-            }
-        } catch (Exception e) {
-            log.error("Execute init sql file: {} error", sqlFile, e);
-            throw new RuntimeException(String.format("Execute init sql file: %s error", sqlFile), e);
+        String sqlFilePath = String.format("sql/dolphinscheduler_%s.sql", dbType.getDescp());
+        SqlScriptRunner sqlScriptRunner = new SqlScriptRunner(dataSource, sqlFilePath);
+        try {
+            sqlScriptRunner.execute();
+            log.info("Success execute the sql initialize file: {}", sqlFilePath);
+        } catch (Exception ex) {
+            throw new RuntimeException("Execute initialize sql file: " + sqlFilePath + " error", ex);
         }
     }
 
@@ -120,14 +111,13 @@ public abstract class UpgradeDao {
 
     private void upgradeDolphinSchedulerDML(String schemaDir, String scriptFile) {
         String schemaVersion = schemaDir.split("_")[0];
-        Resource sqlFilePath = new ClassPathResource(
-                String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile));
-        try (Connection conn = dataSource.getConnection()) {
-            conn.setAutoCommit(false);
+        String sqlFilePath =
+                String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile);
+        try {
             // Execute the upgraded dolphinscheduler dml
-            ScriptRunner scriptRunner = new ScriptRunner(conn, false, true);
-            try (Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream())) {
-                scriptRunner.runScript(sqlReader);
+            SqlScriptRunner sqlScriptRunner = new SqlScriptRunner(dataSource, sqlFilePath);
+            sqlScriptRunner.execute();
+            try (Connection connection = dataSource.getConnection()) {
                 String upgradeSQL;
                 if (isExistsTable(T_VERSION_NAME)) {
                     // Change version in the version table to the new version
@@ -138,11 +128,10 @@ public abstract class UpgradeDao {
                 } else {
                     throw new RuntimeException("The version table does not exist");
                 }
-                try (PreparedStatement pstmt = conn.prepareStatement(upgradeSQL)) {
+                try (PreparedStatement pstmt = connection.prepareStatement(upgradeSQL)) {
                     pstmt.setString(1, schemaVersion);
                     pstmt.executeUpdate();
                 }
-                conn.commit();
             }
             log.info("Success execute the dml file, schemaDir:  {}, ddlScript: {}", schemaDir, scriptFile);
         } catch (FileNotFoundException e) {
@@ -161,15 +150,12 @@ public abstract class UpgradeDao {
      * @param schemaDir schemaDir
      */
     public void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) {
-        Resource sqlFilePath = new ClassPathResource(
-                String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile));
-        try (Connection conn = dataSource.getConnection()) {
-            conn.setAutoCommit(true);
+        String sqlFilePath =
+                String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile);
+        SqlScriptRunner sqlScriptRunner = new SqlScriptRunner(dataSource, sqlFilePath);
+        try {
             // Execute the dolphinscheduler ddl.sql for the upgrade
-            ScriptRunner scriptRunner = new ScriptRunner(conn, true, true);
-            try (Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream())) {
-                scriptRunner.runScript(sqlReader);
-            }
+            sqlScriptRunner.execute();
             log.info("Success execute the ddl file, schemaDir:  {}, ddlScript: {}", schemaDir, scriptFile);
         } catch (FileNotFoundException e) {
             log.error("Cannot find the DDL file, schemaDir:  {}, ddlScript: {}", schemaDir, scriptFile, e);