Parcourir la source

Revert "datasource test and sql task Remove connection pool issues is #14179 (#14193)" (#14626)

This reverts commit e4fb5b30a45b5e2380841e8b37dff2f0408fc5a7.
Wenjun Ruan il y a 1 an
Parent
commit
6617e3fb72

+ 16 - 26
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java

@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.client;
 
+import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
 import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -24,13 +25,15 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.commons.lang3.StringUtils;
 
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.springframework.jdbc.core.JdbcTemplate;
+
 import com.google.common.base.Stopwatch;
+import com.zaxxer.hikari.HikariDataSource;
 
 @Slf4j
 public class CommonDataSourceClient implements DataSourceClient {
@@ -39,7 +42,8 @@ public class CommonDataSourceClient implements DataSourceClient {
     public static final String COMMON_VALIDATION_QUERY = "select 1";
 
     protected final BaseConnectionParam baseConnectionParam;
-    protected Connection connection;
+    protected HikariDataSource dataSource;
+    protected JdbcTemplate jdbcTemplate;
 
     public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
         this.baseConnectionParam = baseConnectionParam;
@@ -59,7 +63,8 @@ public class CommonDataSourceClient implements DataSourceClient {
     }
 
     protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
-        this.connection = buildConn(baseConnectionParam);
+        this.dataSource = JDBCDataSourceProvider.createJdbcDataSource(baseConnectionParam, dbType);
+        this.jdbcTemplate = new JdbcTemplate(dataSource);
     }
 
     protected void checkUser(BaseConnectionParam baseConnectionParam) {
@@ -68,20 +73,6 @@ public class CommonDataSourceClient implements DataSourceClient {
         }
     }
 
-    private Connection buildConn(BaseConnectionParam baseConnectionParam) {
-        Connection conn = null;
-        try {
-            Class.forName(baseConnectionParam.getDriverClassName());
-            conn = DriverManager.getConnection(baseConnectionParam.getJdbcUrl(), baseConnectionParam.getUser(),
-                    baseConnectionParam.getPassword());
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException("Driver load fail", e);
-        } catch (SQLException e) {
-            throw new RuntimeException("JDBC connect failed", e);
-        }
-        return conn;
-    }
-
     protected void setDefaultUsername(BaseConnectionParam baseConnectionParam) {
         baseConnectionParam.setUser(COMMON_USER);
     }
@@ -101,7 +92,7 @@ public class CommonDataSourceClient implements DataSourceClient {
         // Checking data source client
         Stopwatch stopwatch = Stopwatch.createStarted();
         try {
-            this.connection.prepareStatement(this.baseConnectionParam.getValidationQuery()).executeQuery();
+            this.jdbcTemplate.execute(this.baseConnectionParam.getValidationQuery());
         } catch (Exception e) {
             throw new RuntimeException("JDBC connect failed", e);
         } finally {
@@ -113,21 +104,20 @@ public class CommonDataSourceClient implements DataSourceClient {
     @Override
     public Connection getConnection() {
         try {
-            return connection.isClosed() ? buildConn(baseConnectionParam) : connection;
+            return this.dataSource.getConnection();
         } catch (SQLException e) {
-            throw new RuntimeException("get conn is fail", e);
+            log.error("get druidDataSource Connection fail SQLException: {}", e.getMessage(), e);
+            return null;
         }
     }
 
     @Override
     public void close() {
-        log.info("do close connection {}.", baseConnectionParam.getDatabase());
-        try {
-            connection.close();
-        } catch (SQLException e) {
-            log.info("colse connection fail");
-            throw new RuntimeException(e);
+        log.info("do close dataSource {}.", baseConnectionParam.getDatabase());
+        try (HikariDataSource closedDatasource = dataSource) {
+            // only close the resource
         }
+        this.jdbcTemplate = null;
     }
 
 }

+ 34 - 0
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java

@@ -25,9 +25,14 @@ import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 
+import com.google.common.base.Stopwatch;
+
 @Slf4j
 public class AzureSQLDataSourceClient extends CommonDataSourceClient {
 
@@ -44,4 +49,33 @@ public class AzureSQLDataSourceClient extends CommonDataSourceClient {
         return AzureSQLDataSourceProcessor.tokenGetConnection(connectionParam);
     }
 
+    @Override
+    public void checkClient() {
+
+        AzureSQLConnectionParam connectionParam = (AzureSQLConnectionParam) this.baseConnectionParam;
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        String validationQuery = this.baseConnectionParam.getValidationQuery();
+        if (!connectionParam.getMode().equals(AzureSQLAuthMode.ACCESSTOKEN)) {
+            // Checking data source client
+            try {
+                this.jdbcTemplate.execute(validationQuery);
+            } catch (Exception e) {
+                throw new RuntimeException("JDBC connect failed", e);
+            } finally {
+                log.info("Time to execute check jdbc client with sql {} for {} ms ",
+                        this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            }
+        } else {
+            try (Statement statement = getConnection().createStatement()) {
+                if (!statement.execute(validationQuery)) {
+                    throw new SQLException("execute check azure sql token client failed : " + validationQuery);
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            } finally {
+                log.info("Time to execute check azure sql token client with sql {} for {} ms ",
+                        this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            }
+        }
+    }
 }

+ 30 - 0
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java

@@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SEC
 
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
+import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
 import org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -32,9 +33,13 @@ import sun.security.krb5.Config;
 import org.apache.commons.lang3.StringUtils;
 
 import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.SQLException;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.springframework.jdbc.core.JdbcTemplate;
+
 @Slf4j
 public class HiveDataSourceClient extends CommonDataSourceClient {
 
@@ -47,6 +52,17 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
         log.info("PreInit in {}", getClass().getName());
     }
 
+    @Override
+    protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
+        log.info("Create UserGroupInformation.");
+        UserGroupInformationFactory.login(baseConnectionParam.getUser());
+        log.info("Create ugi success.");
+
+        this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
+        this.jdbcTemplate = new JdbcTemplate(dataSource);
+        log.info("Init {} success.", getClass().getName());
+    }
+
     @Override
     protected void checkEnv(BaseConnectionParam baseConnectionParam) {
         super.checkEnv(baseConnectionParam);
@@ -70,6 +86,20 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
         }
     }
 
+    @Override
+    public Connection getConnection() {
+        Connection connection = null;
+        while (connection == null) {
+            try {
+                connection = dataSource.getConnection();
+            } catch (SQLException e) {
+                UserGroupInformationFactory.logout(baseConnectionParam.getUser());
+                UserGroupInformationFactory.login(baseConnectionParam.getUser());
+            }
+        }
+        return connection;
+    }
+
     @Override
     public void close() {
         try {

+ 27 - 0
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java

@@ -18,11 +18,17 @@
 package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
 
 import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
+import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+
 import lombok.extern.slf4j.Slf4j;
 
+import org.springframework.jdbc.core.JdbcTemplate;
+
 @Slf4j
 public class KyuubiDataSourceClient extends CommonDataSourceClient {
 
@@ -35,11 +41,32 @@ public class KyuubiDataSourceClient extends CommonDataSourceClient {
         log.info("PreInit in {}", getClass().getName());
     }
 
+    @Override
+    protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
+
+        this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
+        this.jdbcTemplate = new JdbcTemplate(dataSource);
+        log.info("Init {} success.", getClass().getName());
+    }
+
     @Override
     protected void checkEnv(BaseConnectionParam baseConnectionParam) {
         super.checkEnv(baseConnectionParam);
     }
 
+    @Override
+    public Connection getConnection() {
+        Connection connection = null;
+        while (connection == null) {
+            try {
+                connection = dataSource.getConnection();
+            } catch (SQLException e) {
+                log.error("Failed to get Kyuubi Connection.", e);
+            }
+        }
+        return connection;
+    }
+
     @Override
     public void close() {
         super.close();

+ 8 - 0
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java

@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
 
 import org.apache.dolphinscheduler.plugin.datasource.kyuubi.param.KyuubiConnectionParam;
+import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import java.sql.Connection;
 
@@ -48,6 +49,13 @@ public class KyuubiDataSourceClientTest {
         Mockito.verify(kyuubiDataSourceClient).checkEnv(kyuubiConnectionParam);
     }
 
+    @Test
+    public void testInitClient() {
+        KyuubiConnectionParam kyuubiConnectionParam = new KyuubiConnectionParam();
+        kyuubiDataSourceClient.initClient(kyuubiConnectionParam, DbType.KYUUBI);
+        Mockito.verify(kyuubiDataSourceClient).initClient(kyuubiConnectionParam, DbType.KYUUBI);
+    }
+
     @Test
     public void testCheckClient() {
         kyuubiDataSourceClient.checkClient();

+ 47 - 0
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java

@@ -18,11 +18,21 @@
 package org.apache.dolphinscheduler.plugin.datasource.redshift;
 
 import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
+import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftAuthMode;
+import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftConnectionParam;
+import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftDataSourceProcessor;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
 import lombok.extern.slf4j.Slf4j;
 
+import com.google.common.base.Stopwatch;
+
 @Slf4j
 public class RedshiftDataSourceClient extends CommonDataSourceClient {
 
@@ -30,4 +40,41 @@ public class RedshiftDataSourceClient extends CommonDataSourceClient {
         super(baseConnectionParam, dbType);
     }
 
+    @Override
+    public Connection getConnection() {
+        RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam;
+        if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
+            return super.getConnection();
+        }
+        return RedshiftDataSourceProcessor.getConnectionByIAM(connectionParam);
+    }
+
+    @Override
+    public void checkClient() {
+        RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam;
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        String validationQuery = this.baseConnectionParam.getValidationQuery();
+        if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
+            // Checking data source client
+            try {
+                this.jdbcTemplate.execute(validationQuery);
+            } catch (Exception e) {
+                throw new RuntimeException("JDBC connect failed", e);
+            } finally {
+                log.info("Time to execute check jdbc client with sql {} for {} ms ",
+                        this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            }
+        } else {
+            try (Statement statement = getConnection().createStatement()) {
+                if (!statement.execute(validationQuery)) {
+                    throw new SQLException("execute check redshift access key failed : " + validationQuery);
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            } finally {
+                log.info("Time to execute check redshift access key with sql {} for {} ms ",
+                        this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            }
+        }
+    }
 }