Sfoglia il codice sorgente

[improvement] DataQuality module improve (#14463)

jegger 1 anno fa
parent
commit
75d29f6101

+ 9 - 1
dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/DataQualityApplication.java

@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.data.quality;
 
 import static org.apache.dolphinscheduler.data.quality.Constants.SPARK_APP_NAME;
+import static org.apache.dolphinscheduler.data.quality.enums.ReaderType.HIVE;
 
 import org.apache.dolphinscheduler.data.quality.config.Config;
 import org.apache.dolphinscheduler.data.quality.config.DataQualityConfiguration;
@@ -64,9 +65,16 @@ public class DataQualityApplication {
             config.put(SPARK_APP_NAME, dataQualityConfiguration.getName());
         }
 
-        SparkRuntimeEnvironment sparkRuntimeEnvironment = new SparkRuntimeEnvironment(config);
+        boolean hiveClientSupport = dataQualityConfiguration
+                .getReaderConfigs()
+                .stream()
+                .anyMatch(line -> line.getType().equalsIgnoreCase(HIVE.name()));
+
+        SparkRuntimeEnvironment sparkRuntimeEnvironment = new SparkRuntimeEnvironment(config, hiveClientSupport);
+
         DataQualityContext dataQualityContext =
                 new DataQualityContext(sparkRuntimeEnvironment, dataQualityConfiguration);
+
         dataQualityContext.execute();
     }
 }

+ 7 - 4
dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/execution/SparkRuntimeEnvironment.java

@@ -34,20 +34,23 @@ public class SparkRuntimeEnvironment {
 
     private Config config = new Config();
 
-    public SparkRuntimeEnvironment(Config config) {
+    public SparkRuntimeEnvironment(Config config, boolean hiveClientSupport) {
         if (config != null) {
             this.config = config;
         }
 
-        this.prepare();
+        this.prepare(hiveClientSupport);
     }
 
     public Config getConfig() {
         return this.config;
     }
 
-    public void prepare() {
-        sparkSession = SparkSession.builder().config(createSparkConf()).enableHiveSupport().getOrCreate();
+    public void prepare(boolean hiveClientSupport) {
+        SparkSession.Builder sparkSessionBuilder = SparkSession.builder().config(createSparkConf());
+
+        this.sparkSession = hiveClientSupport ? sparkSessionBuilder.enableHiveSupport().getOrCreate()
+                : sparkSessionBuilder.getOrCreate();
     }
 
     private SparkConf createSparkConf() {

+ 2 - 1
dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/SparkApplicationTestBase.java

@@ -41,6 +41,7 @@ public class SparkApplicationTestBase {
         config.put("spark.ui.port", 13000);
         config.put("spark.master", "local[4]");
 
-        sparkRuntimeEnvironment = new SparkRuntimeEnvironment(new Config(config));
+        // The hive client is disabled by default, and the local execution of Unit Test is guaranteed to be successful.
+        sparkRuntimeEnvironment = new SparkRuntimeEnvironment(new Config(config), false);
     }
 }

+ 5 - 2
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java

@@ -39,6 +39,9 @@ public class SparkArgsUtils {
 
     private static final String SPARK_ON_YARN = "yarn";
 
+    private static final String DEFAULT_QUALITY_CLASS =
+            "org.apache.dolphinscheduler.data.quality.DataQualityApplication";
+
     private SparkArgsUtils() {
         throw new IllegalStateException("Utility class");
     }
@@ -62,9 +65,9 @@ public class SparkArgsUtils {
 
         ProgramType programType = param.getProgramType();
         String mainClass = param.getMainClass();
-        if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
+        if (programType != null && programType != ProgramType.PYTHON) {
             args.add(SparkConstants.MAIN_CLASS);
-            args.add(mainClass);
+            args.add(StringUtils.isNotEmpty(mainClass) ? mainClass : DEFAULT_QUALITY_CLASS);
         }
 
         int driverCores = param.getDriverCores();