Sfoglia il codice sorgente

Merge branch 'dev' into fix-e2e-docker

qiaozhanwei 5 anni fa
parent
commit
459e40bf19

+ 53 - 0
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/DingTalkManager.java

@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.alert.manager;
+
+import org.apache.dolphinscheduler.alert.utils.Constants;
+import org.apache.dolphinscheduler.alert.utils.DingTalkUtils;
+import org.apache.dolphinscheduler.plugin.model.AlertInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Ding Talk Manager
+ */
+public class DingTalkManager {
+    private static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatManager.class);
+
+    public Map<String,Object> send(AlertInfo alert) {
+        Map<String,Object> retMap = new HashMap<>();
+        retMap.put(Constants.STATUS, false);
+        logger.info("send message {}", alert.getAlertData().getTitle());
+        try {
+            String msg = buildMessage(alert);
+            DingTalkUtils.sendDingTalkMsg(msg, Constants.UTF_8);
+        } catch (IOException e) {
+            logger.error(e.getMessage(),e);
+        }
+        retMap.put(Constants.STATUS, true);
+        return retMap;
+    }
+
+    private String buildMessage(AlertInfo alert) {
+        String msg = alert.getAlertData().getContent();
+        return msg;
+    }
+}

+ 8 - 0
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPlugin.java

@@ -16,9 +16,11 @@
  */
 package org.apache.dolphinscheduler.alert.plugin;
 
+import org.apache.dolphinscheduler.alert.manager.DingTalkManager;
 import org.apache.dolphinscheduler.alert.manager.EmailManager;
 import org.apache.dolphinscheduler.alert.manager.EnterpriseWeChatManager;
 import org.apache.dolphinscheduler.alert.utils.Constants;
+import org.apache.dolphinscheduler.alert.utils.DingTalkUtils;
 import org.apache.dolphinscheduler.alert.utils.EnterpriseWeChatUtils;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -44,6 +46,7 @@ public class EmailAlertPlugin implements AlertPlugin {
 
     private static final EmailManager emailManager = new EmailManager();
     private static final EnterpriseWeChatManager weChatManager = new EnterpriseWeChatManager();
+    private static final DingTalkManager dingTalkManager = new DingTalkManager();
 
     public EmailAlertPlugin() {
         this.pluginName = new PluginName();
@@ -121,6 +124,11 @@ public class EmailAlertPlugin implements AlertPlugin {
                     logger.error(e.getMessage(), e);
                 }
             }
+            
+           if (DingTalkUtils.isEnableDingTalk) {
+                logger.info("Ding Talk is enable.");
+                 dingTalkManager.send(info);
+              }
 
         } else {
             retMaps.put(Constants.MESSAGE, "alert send error.");

+ 18 - 0
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java

@@ -156,6 +156,23 @@ public class Constants {
     public static final String ENTERPRISE_WECHAT_AGENT_ID = "enterprise.wechat.agent.id";
 
     public static final String ENTERPRISE_WECHAT_USERS = "enterprise.wechat.users";
+    
+
+    public static final String DINGTALK_WEBHOOK = "dingtalk.webhook";
+
+    public static final String DINGTALK_KEYWORD = "dingtalk.keyword";
+
+    public static final String DINGTALK_PROXY_ENABLE = "dingtalk.isEnableProxy";
+
+    public static final String DINGTALK_PROXY = "dingtalk.proxy";
+
+    public static final String DINGTALK_PORT = "dingtalk.port";
+
+    public static final String DINGTALK_USER = "dingtalk.user";
+
+    public static final String DINGTALK_PASSWORD = "dingtalk.password";
+
+    public static final String DINGTALK_ENABLE = "dingtalk.isEnable";
 
     /**
      * plugin config
@@ -173,4 +190,5 @@ public class Constants {
     public static final String PLUGIN_DEFAULT_EMAIL_RECEIVERCCS = "receiverCcs";
 
     public static final String RETMAP_MSG = "msg";
+
 }

+ 136 - 0
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtils.java

@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.alert.utils;
+
+
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * DingTalkUtils utils
+ * support send msg to ding talk by robot message push function.
+ * support proxy setting
+ */
+public class DingTalkUtils {
+    public static final Logger logger = LoggerFactory.getLogger(DingTalkUtils.class);
+
+    public static final boolean isEnableDingTalk = PropertyUtils.getBoolean(Constants.DINGTALK_ENABLE);
+    private static final String dingTaskUrl = PropertyUtils.getString(Constants.DINGTALK_WEBHOOK);
+    private static final String keyword = PropertyUtils.getString(Constants.DINGTALK_KEYWORD);
+    private static final Boolean isEnableProxy = PropertyUtils.getBoolean(Constants.DINGTALK_PROXY_ENABLE);
+    private static final String proxy = PropertyUtils.getString(Constants.DINGTALK_PROXY);
+    private static final String user = PropertyUtils.getString(Constants.DINGTALK_USER);
+    private static final String passwd = PropertyUtils.getString(Constants.DINGTALK_PASSWORD);
+    private static final Integer port = PropertyUtils.getInt(Constants.DINGTALK_PORT);
+
+    /**
+     * send message interface
+     * only support text message format now.
+     * @param msg message context to send
+     * @param charset charset type
+     * @return  result of sending msg
+     * @throws IOException the IOException
+     */
+    public static String sendDingTalkMsg(String msg, String charset) throws IOException {
+        String msgToJson = textToJsonString(msg + "#" + keyword);
+        HttpPost httpPost = constructHttpPost(msgToJson, charset);
+
+        CloseableHttpClient httpClient;
+        if (isEnableProxy) {
+            httpClient = getProxyClient();
+            RequestConfig rcf = getProxyConfig();
+            httpPost.setConfig(rcf);
+        } else {
+            httpClient = getDefaultClient();
+        }
+
+        try {
+            CloseableHttpResponse response = httpClient.execute(httpPost);
+            String resp;
+            try {
+                HttpEntity entity = response.getEntity();
+                resp = EntityUtils.toString(entity, charset);
+                EntityUtils.consume(entity);
+            } finally {
+                response.close();
+            }
+            logger.info("Ding Talk send [{}], resp:{%s}", msg, resp);
+            return resp;
+        }  finally {
+            httpClient.close();
+        }
+    }
+
+    public static HttpPost constructHttpPost(String msg, String charset) {
+        HttpPost post =  new HttpPost(dingTaskUrl);
+        StringEntity entity = new StringEntity(msg, charset);
+        post.setEntity(entity);
+        post.addHeader("Content-Type", "application/json; charset=utf-8");
+        return post;
+    }
+
+
+    public static CloseableHttpClient getProxyClient() {
+        HttpHost httpProxy = new HttpHost(proxy, port);
+        CredentialsProvider provider = new BasicCredentialsProvider();
+        provider.setCredentials(new AuthScope(httpProxy), new UsernamePasswordCredentials(user, passwd));
+        CloseableHttpClient httpClient = HttpClients.custom().setDefaultCredentialsProvider(provider).build();
+        return httpClient;
+    }
+
+    public static CloseableHttpClient getDefaultClient() {
+        return HttpClients.createDefault();
+    }
+
+    public static RequestConfig getProxyConfig() {
+        HttpHost httpProxy = new HttpHost(proxy, port);
+        return RequestConfig.custom().setProxy(httpProxy).build();
+    }
+
+    public static String textToJsonString(String text) {
+        Map<String, Object> items = new HashMap<String, Object>();
+        items.put("msgtype", "text");
+        Map<String, String> textContent = new HashMap<String, String>();
+        byte[] byt = StringUtils.getBytesUtf8(text);
+        String txt = StringUtils.newStringUtf8(byt);
+        textContent.put("content", txt);
+        items.put("text", textContent);
+
+        return JSON.toJSONString(items);
+
+    }
+
+}

+ 11 - 0
dolphinscheduler-alert/src/main/resources/alert.properties

@@ -36,6 +36,7 @@ mail.smtp.ssl.trust=xxx.xxx.com
 
 # Enterprise WeChat configuration
 enterprise.wechat.enable=false
+
 #enterprise.wechat.corp.id=xxxxxxx
 #enterprise.wechat.secret=xxxxxxx
 #enterprise.wechat.agent.id=xxxxxxx
@@ -47,3 +48,13 @@ enterprise.wechat.enable=false
 
 plugin.dir=/Users/xx/your/path/to/plugin/dir
 
+#ding talk configuration
+dingtalk.isEnable=flase
+dingtalk.webhook=https://oapi.dingtalk.com/robot/send?access_token=xxxxx
+dingtalk.keyword=
+dingtalk.proxy=
+dingtalk.port=80
+dingtalk.user=
+dingtalk.password=
+dingtalk.isEnableProxy=false
+

+ 125 - 0
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtilsTest.java

@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.alert.utils;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+@PrepareForTest(PropertyUtils.class)
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.net.ssl.*")
+public class DingTalkUtilsTest {
+    Logger logger = LoggerFactory.getLogger(DingTalkUtilsTest.class);
+
+    private static final String mockUrl = "https://oapi.dingtalk.com/robot/send?access_token=test";
+    private static final String mockKeyWords = "onway";
+    private static final String msg = "ding talk test";
+
+    @Before
+    public void init(){
+        PowerMockito.mockStatic(PropertyUtils.class);
+        Mockito.when(PropertyUtils.getString(Constants.DINGTALK_WEBHOOK)).thenReturn(mockUrl);
+        Mockito.when(PropertyUtils.getString(Constants.DINGTALK_KEYWORD)).thenReturn(mockKeyWords);
+        Mockito.when(PropertyUtils.getBoolean(Constants.DINGTALK_PROXY_ENABLE)).thenReturn(true);
+        Mockito.when(PropertyUtils.getString(Constants.DINGTALK_PROXY)).thenReturn("proxy.com.cn");
+        Mockito.when(PropertyUtils.getString(Constants.DINGTALK_USER)).thenReturn("user");
+        Mockito.when(PropertyUtils.getString(Constants.DINGTALK_PASSWORD)).thenReturn("pswd");
+        Mockito.when(PropertyUtils.getInt(Constants.DINGTALK_PORT)).thenReturn(80);
+    }
+
+//    @Test
+//    @Ignore
+//    public void testSendMsg() {
+//        try {
+//           String msgTosend = "msg to send";
+//            logger.info(PropertyUtils.getString(Constants.DINGTALK_WEBHOOK));
+//           String rsp = DingTalkUtils.sendDingTalkMsg(msgTosend, Constants.UTF_8);
+//           logger.info("send msg result:{}",rsp);
+//            String errmsg = JSON.parseObject(rsp).getString("errmsg");
+//            Assert.assertEquals("ok", errmsg);
+//        }  catch (Exception e) {
+//            e.printStackTrace();
+//        }
+//    }
+
+    @Test
+    public void testCreateDefaultClient() {
+        CloseableHttpClient client = DingTalkUtils.getDefaultClient();;
+        try {
+            Assert.assertNotNull(client);
+            client.close();
+        } catch (IOException ex) {
+            logger.info("close exception",ex.getMessage());
+            new Throwable();
+        }
+    }
+    @Test
+    public void testCreateProxyClient() {
+        CloseableHttpClient client = DingTalkUtils.getProxyClient();
+        try {
+            Assert.assertNotNull(client);
+            client.close();
+        } catch (IOException ex) {
+            logger.info("close exception",ex.getMessage());
+            new Throwable();
+        }
+
+    }
+    @Test
+    public void testProxyConfig() {
+        RequestConfig rc = DingTalkUtils.getProxyConfig();
+        Assert.assertEquals(rc.getProxy().getPort(), 80);
+        Assert.assertEquals(rc.getProxy().getHostName(), "proxy.com.cn");
+    }
+
+    @Test
+    public void testDingTalkMsgToJson() {
+        String jsonString = DingTalkUtils.textToJsonString("this is test");
+
+        logger.info(jsonString);
+        String expect = "{\"text\":{\"content\":\"this is test\"},\"msgtype\":\"text\"}";
+        Assert.assertEquals(expect, jsonString);
+    }
+    @Test
+    public void testDingTalkMsgUtf8() {
+        String msg = DingTalkUtils.textToJsonString("this is test:中文");
+
+        logger.info("test support utf8, actual:" + msg);
+        logger.info("test support utf8, actual:" + DingTalkUtils.isEnableDingTalk);
+        String expect = "{\"text\":{\"content\":\"this is test:中文\"},\"msgtype\":\"text\"}";
+        Assert.assertEquals(expect, msg);
+    }
+
+}

+ 1 - 0
pom.xml

@@ -687,6 +687,7 @@
                 <version>${maven-surefire-plugin.version}</version>
                 <configuration>
                     <includes>
+                       <include>**/alert/utils/DingTalkUtilsTest.java</include>
                         <include>**/alert/template/AlertTemplateFactoryTest.java</include>
                         <include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
                         <include>**/alert/utils/EnterpriseWeChatUtilsTest.java</include>