Browse Source

[Feature-5087][SqlTask] Add a switch to send mail and print head logs in SqlTask (#5088)

* [Feature-5087][SqlTask] Add the switch to send mail and print head logs in SqlTask

* [Feature-5087][SqlTask] Add SqlParametersTest
Shiwen Cheng 4 years ago
parent
commit
8ac72e80e6

+ 5 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@@ -331,6 +331,11 @@ public final class Constants {
      */
     public static final Pattern REGEX_MAIL_NAME = Pattern.compile("^([a-z0-9A-Z]+[_|\\-|\\.]?)+[a-z0-9A-Z]@([a-z0-9A-Z]+(-[a-z0-9A-Z]+)?\\.)+[a-zA-Z]{2,}$");
 
+    /**
+     * default display rows
+     */
+    public static final int DEFAULT_DISPLAY_ROWS = 10;
+
     /**
      * read permission
      */

+ 28 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java

@@ -50,6 +50,16 @@ public class SqlParameters extends AbstractParameters {
      */
     private int sqlType;
 
+    /**
+     * send email
+     */
+    private Boolean sendEmail;
+
+    /**
+     * display rows
+     */
+    private int displayRows;
+
     /**
      * udf list
      */
@@ -124,6 +134,22 @@ public class SqlParameters extends AbstractParameters {
         this.sqlType = sqlType;
     }
 
+    public Boolean getSendEmail() {
+        return sendEmail;
+    }
+
+    public void setSendEmail(Boolean sendEmail) {
+        this.sendEmail = sendEmail;
+    }
+
+    public int getDisplayRows() {
+        return displayRows;
+    }
+
+    public void setDisplayRows(int displayRows) {
+        this.displayRows = displayRows;
+    }
+
     public String getShowType() {
         return showType;
     }
@@ -189,6 +215,8 @@ public class SqlParameters extends AbstractParameters {
                 + ", datasource=" + datasource
                 + ", sql='" + sql + '\''
                 + ", sqlType=" + sqlType
+                + ", sendEmail=" + sendEmail
+                + ", displayRows=" + displayRows
                 + ", udfs='" + udfs + '\''
                 + ", showType='" + showType + '\''
                 + ", connParams='" + connParams + '\''

+ 68 - 0
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java

@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.task;
+
+import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SqlParametersTest {
+
+    private final String type = "MYSQL";
+    private final String sql = "select * from t_ds_user";
+    private final String udfs = "test-udfs-1.0.0-SNAPSHOT.jar";
+    private final int datasource = 1;
+    private final int sqlType = 0;
+    private final Boolean sendEmail = true;
+    private final int displayRows = 10;
+    private final String showType = "TABLE";
+    private final String title = "sql test";
+    private final int groupId = 0;
+
+    @Test
+    public void testSqlParameters() {
+        SqlParameters sqlParameters = new SqlParameters();
+        Assert.assertTrue(CollectionUtils.isEmpty(sqlParameters.getResourceFilesList()));
+
+        sqlParameters.setType(type);
+        sqlParameters.setSql(sql);
+        sqlParameters.setUdfs(udfs);
+        sqlParameters.setDatasource(datasource);
+        sqlParameters.setSqlType(sqlType);
+        sqlParameters.setSendEmail(sendEmail);
+        sqlParameters.setDisplayRows(displayRows);
+        sqlParameters.setShowType(showType);
+        sqlParameters.setTitle(title);
+        sqlParameters.setGroupId(groupId);
+
+        Assert.assertEquals(type, sqlParameters.getType());
+        Assert.assertEquals(sql, sqlParameters.getSql());
+        Assert.assertEquals(udfs, sqlParameters.getUdfs());
+        Assert.assertEquals(datasource, sqlParameters.getDatasource());
+        Assert.assertEquals(sqlType, sqlParameters.getSqlType());
+        Assert.assertEquals(sendEmail, sqlParameters.getSendEmail());
+        Assert.assertEquals(displayRows, sqlParameters.getDisplayRows());
+        Assert.assertEquals(showType, sqlParameters.getShowType());
+        Assert.assertEquals(title, sqlParameters.getTitle());
+        Assert.assertEquals(groupId, sqlParameters.getGroupId());
+
+        Assert.assertTrue(sqlParameters.checkParameters());
+    }
+}

+ 17 - 11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@@ -149,13 +149,13 @@ public class SqlTask extends AbstractTask {
                     logger);
 
             // execute sql task
-            executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs, sqlParameters.getLocalParams());
+            executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
 
             setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
 
         } catch (Exception e) {
             setExitStatusCode(Constants.EXIT_CODE_FAILURE);
-            logger.error("sql task error", e);
+            logger.error("sql task error: {}", e.toString());
             throw e;
         }
     }
@@ -238,8 +238,7 @@ public class SqlTask extends AbstractTask {
     public void executeFuncAndSql(SqlBinds mainSqlBinds,
                                   List<SqlBinds> preStatementsBinds,
                                   List<SqlBinds> postStatementsBinds,
-                                  List<String> createFuncs,
-                                  List<Property> properties) {
+                                  List<String> createFuncs) throws Exception {
         Connection connection = null;
         PreparedStatement stmt = null;
         ResultSet resultSet = null;
@@ -268,15 +267,15 @@ public class SqlTask extends AbstractTask {
             } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
                 // non query statement
                 String updateResult = String.valueOf(stmt.executeUpdate());
-                result = setNonQuerySqlReturn(updateResult, properties);
+                result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());
             }
 
             postSql(connection, postStatementsBinds);
             this.setResultString(result);
 
         } catch (Exception e) {
-            logger.error("execute sql error", e);
-            throw new RuntimeException("execute sql error");
+            logger.error("execute sql error: {}", e.getMessage());
+            throw e;
         } finally {
             close(resultSet, stmt, connection);
         }
@@ -319,12 +318,19 @@ public class SqlTask extends AbstractTask {
             rowCount++;
         }
         String result = JSONUtils.toJsonString(resultJSONArray);
-        logger.debug("execute sql : {}", result);
-        try {
+        logger.debug("execute sql result : {}", result);
+
+        int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() : Constants.DEFAULT_DISPLAY_ROWS;
+        displayRows = Math.min(displayRows, resultJSONArray.size());
+        logger.info("display sql result {} rows as follows:", displayRows);
+        for (int i = 0; i < displayRows; i++) {
+            String row = JSONUtils.toJsonString(resultJSONArray.get(i));
+            logger.info("row {} : {}", i + 1, row);
+        }
+
+        if (sqlParameters.getSendEmail() == null || sqlParameters.getSendEmail()) {
             sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets",
                     JSONUtils.toJsonString(resultJSONArray));
-        } catch (Exception e) {
-            logger.warn("sql task sendAttachment error! msg : {} ", e.getMessage());
         }
         return result;
     }

+ 28 - 6
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue

@@ -32,9 +32,18 @@
         <div style="display: inline-block;">
           <m-sql-type @on-sqlType="_onSqlType" :sql-type="sqlType"></m-sql-type>
         </div>
+        <div style="display: inline-block;" v-if="sqlType === '0'">
+          <span class="text-b">{{$t('Send Email')}}</span>
+          <el-switch size="small" v-model="sendEmail"></el-switch>
+        </div>
+        <div style="display: inline-block;" v-if="sqlType === '0'">
+          <span class="text-b">{{$t('Display query result')}}</span>
+          <m-select-input v-model="displayRows" :list="[1,10,25,50,100]" style="width: 70px;"></m-select-input>
+          <span>({{$t('Rows')}})</span>
+        </div>
       </div>
     </m-list-box>
-    <template v-if="sqlType === 0">
+    <template v-if="sqlType === '0' && sendEmail">
       <m-list-box>
         <div slot="text"><strong class='requiredIcon'>*</strong>{{$t('Title')}}</div>
         <div slot="content">
@@ -54,7 +63,7 @@
         </div>
       </m-list-box>
     </template>
-    <m-list-box v-if="type === 'HIVE'">
+    <m-list-box v-show="type === 'HIVE'">
       <div slot="text">{{$t('SQL Parameter')}}</div>
       <div slot="content">
         <el-input
@@ -141,6 +150,7 @@
   import mLocalParams from './_source/localParams'
   import mStatementList from './_source/statementList'
   import mWarningGroups from './_source/warningGroups'
+  import mSelectInput from '../_source/selectInput'
   import disabledState from '@/module/mixin/disabledState'
   import codemirror from '@/conf/home/pages/resource/pages/file/pages/_source/codemirror'
 
@@ -164,6 +174,10 @@
         udfs: '',
         // Sql type
         sqlType: '0',
+        // Send email
+        sendEmail: false,
+        // Display rows
+        displayRows: 10,
         // Email title
         title: '',
         // Sql parameter
@@ -240,11 +254,11 @@
         if (!this.$refs.refDs._verifDatasource()) {
           return false
         }
-        if (this.sqlType === '0' && !this.title) {
+        if (this.sqlType === '0' && this.sendEmail && !this.title) {
           this.$message.warning(`${i18n.$t('Mail subject required')}`)
           return false
         }
-        if (this.sqlType === '0' && (this.groupId === '' || this.groupId === null)) {
+        if (this.sqlType === '0' && this.sendEmail && (this.groupId === '' || this.groupId === null)) {
           this.$message.warning(`${i18n.$t('Alarm group required')}`)
           return false
         }
@@ -277,6 +291,8 @@
           sql: editor.getValue(),
           udfs: this.udfs,
           sqlType: this.sqlType,
+          sendEmail: this.sendEmail,
+          displayRows: this.displayRows,
           title: this.title,
           groupId: this.groupId,
           localParams: this.localParams,
@@ -326,6 +342,8 @@
           sql: editor ? editor.getValue() : '',
           udfs: this.udfs,
           sqlType: this.sqlType,
+          sendEmail: this.sendEmail,
+          displayRows: this.displayRows,
           title: this.title,
           groupId: this.groupId,
           localParams: this.localParams,
@@ -345,7 +363,7 @@
     watch: {
       // Listening to sqlType
       sqlType (val) {
-        if (val !== 0) {
+        if (val !== '0') {
           this.title = ''
           this.groupId = null
         }
@@ -372,6 +390,8 @@
         this.sql = o.params.sql || ''
         this.udfs = o.params.udfs || ''
         this.sqlType = o.params.sqlType
+        this.sendEmail = o.params.sendEmail || false
+        this.displayRows = o.params.displayRows || 10
         this.connParams = o.params.connParams || ''
         this.localParams = o.params.localParams || []
         this.preStatements = o.params.preStatements || []
@@ -402,6 +422,8 @@
           datasource: this.rtDatasource,
           udfs: this.udfs,
           sqlType: this.sqlType,
+          sendEmail: this.sendEmail,
+          displayRows: this.displayRows,
           title: this.title,
           groupId: this.groupId,
           localParams: this.localParams,
@@ -411,6 +433,6 @@
         }
       }
     },
-    components: { mListBox, mDatasource, mLocalParams, mUdfs, mSqlType, mStatementList, mScriptBox, mWarningGroups }
+    components: { mListBox, mDatasource, mLocalParams, mUdfs, mSqlType, mStatementList, mScriptBox, mWarningGroups, mSelectInput }
   }
 </script>

+ 3 - 0
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@@ -130,6 +130,9 @@ export default {
   'App Name': 'App Name',
   'Please enter app name(optional)': 'Please enter app name(optional)',
   'SQL Type': 'SQL Type',
+  'Send Email': 'Send Email',
+  'Display query result': 'Display query result',
+  Rows: 'Rows',
   Title: 'Title',
   'Please enter the title of email': 'Please enter the title of email',
   Table: 'Table',

+ 3 - 0
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@@ -130,6 +130,9 @@ export default {
   'App Name': '任务名称',
   'Please enter app name(optional)': '请输入任务名称(选填)',
   'SQL Type': 'sql类型',
+  'Send Email': '发送邮件',
+  'Display query result': '展示查询结果',
+  Rows: '行',
   Title: '主题',
   'Please enter the title of email': '请输入邮件主题',
   Table: '表名',

+ 1 - 0
pom.xml

@@ -854,6 +854,7 @@
                         <include>**/common/task/EntityTestUtils.java</include>
                         <include>**/common/task/FlinkParametersTest.java</include>
                         <include>**/common/task/HttpParametersTest.java</include>
+                        <include>**/common/task/SqlParametersTest.java</include>
                         <include>**/common/task/SqoopParameterEntityTest.java</include>
                         <include>**/common/threadutils/ThreadPoolExecutorsTest.java</include>
                         <include>**/common/threadutils/ThreadUtilsTest.java</include>