Browse Source

Remove datasource in procedure task (#13198)

Wenjun Ruan 2 years ago
parent
commit
b2336b0ce9

+ 7 - 0
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java

@@ -18,9 +18,12 @@
 package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
 
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import java.util.Map;
 
+import lombok.NonNull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +46,10 @@ public class DataSourceProcessorProvider {
         return DataSourceClientProviderHolder.INSTANCE;
     }
 
+    public DataSourceProcessor getDataSourceProcessor(@NonNull DbType dbType) {
+        return dataSourcePluginManager.getDataSourceProcessorMap().get(dbType.name());
+    }
+
     public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
         return dataSourcePluginManager.getDataSourceProcessorMap();
     }

+ 18 - 53
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

@@ -21,8 +21,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
-import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -40,7 +40,6 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.sql.CallableStatement;
 import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.HashMap;
@@ -96,19 +95,12 @@ public class ProcedureTask extends AbstractTask {
                 procedureParameters.getMethod(),
                 procedureParameters.getLocalParams());
 
-        Connection connection = null;
-        CallableStatement stmt = null;
-        try {
-            // load class
-            DbType dbType = DbType.valueOf(procedureParameters.getType());
-            // get datasource
-            ConnectionParam connectionParam =
-                    DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
-                            procedureTaskExecutionContext.getConnectionParams());
-
-            // get jdbc connection
-            connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
-
+        DbType dbType = DbType.valueOf(procedureParameters.getType());
+        DataSourceProcessor dataSourceProcessor =
+                DataSourceProcessorProvider.getInstance().getDataSourceProcessor(dbType);
+        ConnectionParam connectionParams =
+                dataSourceProcessor.createConnectionParams(procedureTaskExecutionContext.getConnectionParams());
+        try (Connection connection = dataSourceProcessor.getConnection(connectionParams)) {
             Map<Integer, Property> sqlParamsMap = new HashMap<>();
             Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap()
                     : taskExecutionContext.getPrepareParamsMap();
@@ -116,30 +108,26 @@ public class ProcedureTask extends AbstractTask {
                 // set out params before format sql
                 paramsMap.putAll(procedureParameters.getOutProperty());
             }
-
-            // format sql
             String proceduerSql = formatSql(sqlParamsMap, paramsMap);
             // call method
-            stmt = connection.prepareCall(proceduerSql);
-
-            // set timeout
-            setTimeout(stmt);
+            try (CallableStatement stmt = connection.prepareCall(proceduerSql)) {
+                // set timeout
+                setTimeout(stmt);
 
-            // outParameterMap
-            Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
+                // outParameterMap
+                Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
 
-            stmt.executeUpdate();
+                stmt.executeUpdate();
 
-            // print the output parameters to the log
-            printOutParameter(stmt, outParameterMap);
+                // print the output parameters to the log
+                printOutParameter(stmt, outParameterMap);
 
-            setExitStatusCode(EXIT_CODE_SUCCESS);
+                setExitStatusCode(EXIT_CODE_SUCCESS);
+            }
         } catch (Exception e) {
             setExitStatusCode(EXIT_CODE_FAILURE);
             logger.error("procedure task error", e);
             throw new TaskException("Execute procedure task failed", e);
-        } finally {
-            close(stmt, connection);
         }
     }
 
@@ -220,29 +208,6 @@ public class ProcedureTask extends AbstractTask {
         }
     }
 
-    /**
-     * close jdbc resource
-     *
-     * @param stmt       stmt
-     * @param connection connection
-     */
-    private void close(PreparedStatement stmt, Connection connection) {
-        if (stmt != null) {
-            try {
-                stmt.close();
-            } catch (SQLException e) {
-                logger.error("close prepared statement error : {}", e.getMessage(), e);
-            }
-        }
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (SQLException e) {
-                logger.error("close connection error : {}", e.getMessage(), e);
-            }
-        }
-    }
-
     /**
      * get output parameter
      *