Browse Source

[Feature-9204][alert] Implement alert send status (#9208)

* [DS-9204][feat][alert,dao] Implement alert send status
- implement alert send status
- add alert send status entity、mapper
- modify alert dao
- modify alert sender
- add test
- add sql

This closes #9204

* [DS-9204][feat][alert,dao] Implement alert send status
- add license header

This closes #9204
worry 3 years ago
parent
commit
2bab12f2c8

+ 15 - 3
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java

@@ -34,9 +34,11 @@ import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
 import org.apache.commons.collections.CollectionUtils;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,14 +74,24 @@ public final class AlertSender {
                      .setTitle(alert.getTitle())
                      .setWarnType(alert.getWarningType().getCode());
 
+            int sendSuccessCount = 0;
             for (AlertPluginInstance instance : alertInstanceList) {
-
                 AlertResult alertResult = this.alertResultHandler(instance, alertData);
                 if (alertResult != null) {
-                    AlertStatus alertStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
-                    alertDao.updateAlert(alertStatus, alertResult.getMessage(), alert.getId());
+                    AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
+                    alertDao.addAlertSendStatus(sendStatus,alertResult.getMessage(),alert.getId(),instance.getId());
+                    if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) {
+                        sendSuccessCount++;
+                    }
                 }
             }
+            AlertStatus alertStatus = AlertStatus.EXECUTION_SUCCESS;
+            if (sendSuccessCount == 0) {
+                alertStatus = AlertStatus.EXECUTION_FAILURE;
+            } else if (sendSuccessCount < alertInstanceList.size()) {
+                alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
+            }
+            alertDao.updateAlert(alertStatus, "", alert.getId());
         }
 
     }

+ 2 - 1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertStatus.java

@@ -28,7 +28,8 @@ public enum AlertStatus {
      */
     WAIT_EXECUTION(0, "waiting executed"),
     EXECUTION_SUCCESS(1, "execute successfully"),
-    EXECUTION_FAILURE(2, "execute failed");
+    EXECUTION_FAILURE(2, "execute failed"),
+    EXECUTION_PARTIAL_SUCCESS(3, "execute partial successfully");
 
     AlertStatus(int code, String descp) {
         this.code = code;

+ 24 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.Alert;
 import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
+import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
 import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProjectUser;
@@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
 import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
 import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -57,6 +59,9 @@ public class AlertDao {
     @Autowired
     private AlertGroupMapper alertGroupMapper;
 
+    @Autowired
+    private AlertSendStatusMapper alertSendStatusMapper;
+
     /**
      * insert alert
      *
@@ -83,6 +88,25 @@ public class AlertDao {
         return alertMapper.updateById(alert);
     }
 
+    /**
+     * add AlertSendStatus
+     *
+     * @param sendStatus alert send status
+     * @param log log
+     * @param alertId alert id
+     * @param alertPluginInstanceId alert plugin instance id
+     * @return insert count
+     */
+    public int addAlertSendStatus(AlertStatus sendStatus, String log, int alertId, int alertPluginInstanceId) {
+        AlertSendStatus alertSendStatus = new AlertSendStatus();
+        alertSendStatus.setAlertId(alertId);
+        alertSendStatus.setAlertPluginInstanceId(alertPluginInstanceId);
+        alertSendStatus.setSendStatus(sendStatus);
+        alertSendStatus.setLog(log);
+        alertSendStatus.setCreateTime(new Date());
+        return alertSendStatusMapper.insert(alertSendStatus);
+    }
+
     /**
      * MasterServer or WorkerServer stoped
      *

+ 145 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/AlertSendStatus.java

@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.entity;
+
+import org.apache.dolphinscheduler.common.enums.AlertStatus;
+
+import java.util.Date;
+import java.util.StringJoiner;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.google.common.base.Objects;
+
+@TableName("t_ds_alert_send_status")
+public class AlertSendStatus {
+    /**
+     * primary key
+     */
+    @TableId(value = "id", type = IdType.AUTO)
+    private int id;
+
+    /**
+     * alert id
+     */
+    @TableField(value = "alert_id")
+    private int alertId;
+
+    /**
+     * alert plugin instance id
+     */
+    @TableField(value = "alert_plugin_instance_id")
+    private int alertPluginInstanceId;
+
+    /**
+     * alert send status
+     */
+    @TableField(value = "send_status")
+    private AlertStatus sendStatus;
+
+    /**
+     * log
+     */
+    @TableField(value = "log")
+    private String log;
+
+    /**
+     * create_time
+     */
+    @TableField("create_time")
+    private Date createTime;
+
+    public int getId() {
+        return id;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public int getAlertId() {
+        return alertId;
+    }
+
+    public void setAlertId(int alertId) {
+        this.alertId = alertId;
+    }
+
+    public int getAlertPluginInstanceId() {
+        return alertPluginInstanceId;
+    }
+
+    public void setAlertPluginInstanceId(int alertPluginInstanceId) {
+        this.alertPluginInstanceId = alertPluginInstanceId;
+    }
+
+    public AlertStatus getSendStatus() {
+        return sendStatus;
+    }
+
+    public void setSendStatus(AlertStatus sendStatus) {
+        this.sendStatus = sendStatus;
+    }
+
+    public String getLog() {
+        return log;
+    }
+
+    public void setLog(String log) {
+        this.log = log;
+    }
+
+    public Date getCreateTime() {
+        return createTime;
+    }
+
+    public void setCreateTime(Date createTime) {
+        this.createTime = createTime;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AlertSendStatus that = (AlertSendStatus) o;
+        return alertId == that.alertId && alertPluginInstanceId == that.alertPluginInstanceId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(alertId, alertPluginInstanceId);
+    }
+
+    @Override
+    public String toString() {
+        return new StringJoiner(", ", AlertSendStatus.class.getSimpleName() + "[", "]")
+            .add("id=" + id)
+            .add("alertId=" + alertId)
+            .add("alertPluginInstanceId=" + alertPluginInstanceId)
+            .add("sendStatus=" + sendStatus)
+            .add("log='" + log + "'")
+            .add("createTime=" + createTime)
+            .toString();
+    }
+}

+ 25 - 0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java

@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.mapper;
+
+import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface AlertSendStatusMapper extends BaseMapper<AlertSendStatus> {
+}

+ 17 - 0
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@@ -1910,3 +1910,20 @@ CREATE TABLE t_ds_k8s_namespace (
 -- ----------------------------
 INSERT INTO t_ds_k8s_namespace
 VALUES (1, 10000, 'default', 99, 'owner',1,NULL,1,'test',NULL,'default',null,null);
+
+-- ----------------------------
+-- Table structure for t_ds_alert_send_status
+-- ----------------------------
+DROP TABLE IF EXISTS t_ds_alert_send_status CASCADE;
+CREATE TABLE t_ds_alert_send_status
+(
+    id                            int NOT NULL AUTO_INCREMENT,
+    alert_id                      int NOT NULL,
+    alert_plugin_instance_id      int NOT NULL,
+    send_status                   tinyint(4) DEFAULT '0',
+    log                           text,
+    create_time                   timestamp NULL DEFAULT CURRENT_TIMESTAMP,
+    PRIMARY KEY (id),
+    UNIQUE KEY alert_send_status_unique (alert_id,alert_plugin_instance_id)
+);
+

+ 16 - 0
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@@ -1897,3 +1897,19 @@ CREATE TABLE `t_ds_k8s_namespace` (
   PRIMARY KEY (`id`),
   UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)
 ) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
+
+
+-- ----------------------------
+-- Table structure for t_ds_alert_send_status
+-- ----------------------------
+DROP TABLE IF EXISTS t_ds_alert_send_status;
+CREATE TABLE t_ds_alert_send_status (
+  `id`                            int(11) NOT NULL AUTO_INCREMENT,
+  `alert_id`                      int(11) NOT NULL,
+  `alert_plugin_instance_id`      int(11) NOT NULL,
+  `send_status`                   tinyint(4) DEFAULT '0',
+  `log`                           text,
+  `create_time`                   datetime DEFAULT NULL COMMENT 'create time',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

+ 15 - 0
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@@ -1894,3 +1894,18 @@ CREATE TABLE t_ds_k8s_namespace (
    PRIMARY KEY (id) ,
    CONSTRAINT k8s_namespace_unique UNIQUE (namespace,k8s)
 );
+
+-- ----------------------------
+-- Table structure for t_ds_alert_send_status
+-- ----------------------------
+DROP TABLE IF EXISTS t_ds_alert_send_status;
+CREATE TABLE t_ds_alert_send_status (
+    id                           serial NOT NULL,
+    alert_id                     int NOT NULL,
+    alert_plugin_instance_id     int NOT NULL,
+    send_status                  int DEFAULT '0',
+    log                          text,
+    create_time                  timestamp DEFAULT NULL,
+    PRIMARY KEY (id),
+    CONSTRAINT alert_send_status_unique UNIQUE (alert_id,alert_plugin_instance_id)
+);

+ 6 - 0
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/AlertDaoTest.java

@@ -61,6 +61,12 @@ public class AlertDaoTest {
         Assert.assertNotEquals(0, alerts.size());
     }
 
+    @Test
+    public void testAddAlertSendStatus() {
+        int insertCount = alertDao.addAlertSendStatus(AlertStatus.EXECUTION_SUCCESS,"success",1,1);
+        Assert.assertEquals(1, insertCount);
+    }
+
     @Test
     public void testSendServerStopedAlert() {
         int alertGroupId = 1;

+ 53 - 0
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapperTest.java

@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.mapper;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+
+import org.apache.dolphinscheduler.common.enums.AlertStatus;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.dao.BaseDaoTest;
+import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
+
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * AlertSendStatus mapper test
+ */
+public class AlertSendStatusMapperTest extends BaseDaoTest {
+    @Autowired
+    private AlertSendStatusMapper alertSendStatusMapper;
+
+    /**
+     * test insert
+     */
+    @Test
+    public void testInsert() {
+        AlertSendStatus alertSendStatus = new AlertSendStatus();
+        alertSendStatus.setAlertId(1);
+        alertSendStatus.setAlertPluginInstanceId(1);
+        alertSendStatus.setSendStatus(AlertStatus.EXECUTION_SUCCESS);
+        alertSendStatus.setLog("success");
+        alertSendStatus.setCreateTime(DateUtils.getCurrentDate());
+
+        alertSendStatusMapper.insert(alertSendStatus);
+        assertThat(alertSendStatus.getId(), greaterThan(0));
+    }
+}