Browse Source

Use bigint represent term in mysql registry to avoid time inaccurate (#13082)

Wenjun Ruan 2 years ago
parent
commit
4ce9c82cba

+ 105 - 32
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java

@@ -29,11 +29,13 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.Statement;
-import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Optional;
+
+import lombok.NonNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,7 +74,8 @@ public class MysqlOperator implements AutoCloseable {
     }
 
     public List<MysqlRegistryData> queryAllMysqlRegistryData() throws SQLException {
-        String sql = "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data";
+        String sql =
+                "select id, `key`, data, type, last_term, create_time, last_update_time from t_ds_mysql_registry_data";
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = connection.prepareStatement(sql);
@@ -84,6 +87,7 @@ public class MysqlOperator implements AutoCloseable {
                         .key(resultSet.getString("key"))
                         .data(resultSet.getString("data"))
                         .type(resultSet.getInt("type"))
+                        .lastTerm(resultSet.getLong("last_term"))
                         .createTime(resultSet.getTimestamp("create_time"))
                         .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
                         .build();
@@ -93,24 +97,75 @@ public class MysqlOperator implements AutoCloseable {
         }
     }
 
-    public long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
+    public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
+        Optional<MysqlRegistryData> mysqlRegistryDataOptional = selectByKey(key);
+        if (mysqlRegistryDataOptional.isPresent()) {
+            long id = mysqlRegistryDataOptional.get().getId();
+            if (!updateValueById(id, value)) {
+                throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
+            }
+            return id;
+        }
+        MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
+                .key(key)
+                .data(value)
+                .type(DataType.EPHEMERAL.getTypeValue())
+                .lastTerm(System.currentTimeMillis())
+                .build();
+        return insertMysqlRegistryData(mysqlRegistryData);
+    }
+
+    private Optional<MysqlRegistryData> selectByKey(@NonNull String key) throws SQLException {
         String sql =
-                "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)"
-                        +
-                        "ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp";
-        // put a ephemeralData
+                "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data where `key` = ?";
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, key);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                if (resultSet.next()) {
+                    return Optional.of(
+                            MysqlRegistryData.builder()
+                                    .id(resultSet.getLong("id"))
+                                    .key(resultSet.getString("key"))
+                                    .data(resultSet.getString("data"))
+                                    .type(resultSet.getInt("type"))
+                                    .createTime(resultSet.getTimestamp("create_time"))
+                                    .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
+                                    .build());
+                }
+                return Optional.empty();
+            }
+        }
+    }
+
+    private boolean updateValueById(long id, String value) throws SQLException {
+        String sql = "update t_ds_mysql_registry_data set data = ?, last_term = ? where id = ?";
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, value);
+            preparedStatement.setLong(2, System.currentTimeMillis());
+            preparedStatement.setLong(3, id);
+            return preparedStatement.executeUpdate() > 0;
+        }
+    }
+
+    private long insertMysqlRegistryData(@NonNull MysqlRegistryData mysqlRegistryData) throws SQLException {
+        String sql =
+                "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)";
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement =
                         connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
-            preparedStatement.setString(1, key);
-            preparedStatement.setString(2, value);
-            preparedStatement.setInt(3, DataType.EPHEMERAL.getTypeValue());
-            preparedStatement.setString(4, value);
+            preparedStatement.setString(1, mysqlRegistryData.getKey());
+            preparedStatement.setString(2, mysqlRegistryData.getData());
+            preparedStatement.setInt(3, mysqlRegistryData.getType());
+            preparedStatement.setLong(4, mysqlRegistryData.getLastTerm());
             int insertCount = preparedStatement.executeUpdate();
             ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
             if (insertCount < 1 || !generatedKeys.next()) {
-                throw new SQLException("Insert or update ephemeral data error");
+                throw new SQLException("Insert ephemeral data error, data: " + mysqlRegistryData);
             }
             return generatedKeys.getLong(1);
         }
@@ -118,18 +173,21 @@ public class MysqlOperator implements AutoCloseable {
 
     public long insertOrUpdatePersistentData(String key, String value) throws SQLException {
         String sql =
-                "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)"
+                "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)"
                         +
-                        "ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp";
+                        "ON DUPLICATE KEY UPDATE data=?, last_term=?";
         // put a persistent Data
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement =
                         connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+            long term = System.currentTimeMillis();
             preparedStatement.setString(1, key);
             preparedStatement.setString(2, value);
             preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue());
-            preparedStatement.setString(4, value);
+            preparedStatement.setLong(4, term);
+            preparedStatement.setString(5, value);
+            preparedStatement.setLong(6, term);
             int insertCount = preparedStatement.executeUpdate();
             ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
             if (insertCount < 1 || !generatedKeys.next()) {
@@ -176,8 +234,7 @@ public class MysqlOperator implements AutoCloseable {
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setTimestamp(1,
-                    new Timestamp(System.currentTimeMillis() - expireTimeWindow));
+            preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow);
             int i = preparedStatement.executeUpdate();
             if (i > 0) {
                 logger.info("Clear expire lock, size: {}", i);
@@ -188,11 +245,11 @@ public class MysqlOperator implements AutoCloseable {
     }
 
     public void clearExpireEphemeralDate() {
-        String sql = "delete from t_ds_mysql_registry_data where last_update_time < ? and type = ?";
+        String sql = "delete from t_ds_mysql_registry_data where last_term < ? and type = ?";
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - expireTimeWindow));
+            preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow);
             preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
             int i = preparedStatement.executeUpdate();
             if (i > 0) {
@@ -205,7 +262,7 @@ public class MysqlOperator implements AutoCloseable {
 
     public MysqlRegistryData getData(String key) throws SQLException {
         String sql =
-                "SELECT id, `key`, data, type, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
+                "SELECT id, `key`, data, type, last_term, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
@@ -219,6 +276,7 @@ public class MysqlOperator implements AutoCloseable {
                         .key(resultSet.getString("key"))
                         .data(resultSet.getString("data"))
                         .type(resultSet.getInt("type"))
+                        .lastTerm(resultSet.getLong("last_term"))
                         .createTime(resultSet.getTimestamp("create_time"))
                         .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
                         .build();
@@ -265,13 +323,14 @@ public class MysqlOperator implements AutoCloseable {
      */
     public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
         String sql =
-                "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp, current_timestamp, current_timestamp)";
+                "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term) VALUES (?, ?, ?)";
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement =
                         connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
             preparedStatement.setString(1, key);
             preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER);
+            preparedStatement.setLong(3, System.currentTimeMillis());
             preparedStatement.executeUpdate();
             try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
                 if (resultSet.next()) {
@@ -299,7 +358,7 @@ public class MysqlOperator implements AutoCloseable {
                             .id(resultSet.getLong("id"))
                             .key(resultSet.getString("key"))
                             .lockOwner(resultSet.getString("lock_owner"))
-                            .lastTerm(resultSet.getTimestamp("last_term"))
+                            .lastTerm(resultSet.getLong("last_term"))
                             .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
                             .createTime(resultSet.getTimestamp("create_time"))
                             .build();
@@ -322,24 +381,38 @@ public class MysqlOperator implements AutoCloseable {
     }
 
     public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) throws SQLException {
-        String sql = "update t_ds_mysql_registry_data set `last_update_time` = current_timestamp() where `id` IN (?)";
-        String ids = ephemeralDateIds.stream().map(String::valueOf).collect(Collectors.joining(","));
+        StringBuilder sb = new StringBuilder("update t_ds_mysql_registry_data set `last_term` = ? where `id` IN (");
+        Iterator<Long> iterator = ephemeralDateIds.iterator();
+        for (int i = 0; i < ephemeralDateIds.size(); i++) {
+            sb.append(iterator.next());
+            if (i != ephemeralDateIds.size() - 1) {
+                sb.append(",");
+            }
+        }
+        sb.append(")");
         try (
                 Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, ids);
+                PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) {
+            preparedStatement.setLong(1, System.currentTimeMillis());
             return preparedStatement.executeUpdate() > 0;
         }
     }
 
     public boolean updateLockTerm(List<Long> lockIds) throws SQLException {
-        String sql =
-                "update t_ds_mysql_registry_lock set `last_term` = current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)";
-        String ids = lockIds.stream().map(String::valueOf).collect(Collectors.joining(","));
+        StringBuilder sb =
+                new StringBuilder("update t_ds_mysql_registry_lock set `last_term` = ? where `id` IN (");
+        Iterator<Long> iterator = lockIds.iterator();
+        for (int i = 0; i < lockIds.size(); i++) {
+            sb.append(iterator.next());
+            if (i != lockIds.size() - 1) {
+                sb.append(",");
+            }
+        }
+        sb.append(")");
         try (
                 Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, ids);
+                PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) {
+            preparedStatement.setLong(1, System.currentTimeMillis());
             return preparedStatement.executeUpdate() > 0;
         }
     }

+ 1 - 0
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java

@@ -34,6 +34,7 @@ public class MysqlRegistryData {
     private String key;
     private String data;
     private int type;
+    private long lastTerm;
     private Date createTime;
     private Date lastUpdateTime;
 

+ 1 - 1
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java

@@ -42,7 +42,7 @@ public class MysqlRegistryLock {
     /**
      * The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired.
      */
-    private Date lastTerm;
+    private Long lastTerm;
     /**
      * The lock last update time.
      */

+ 12 - 11
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql

@@ -20,12 +20,13 @@ SET FOREIGN_KEY_CHECKS = 0;
 DROP TABLE IF EXISTS `t_ds_mysql_registry_data`;
 CREATE TABLE `t_ds_mysql_registry_data`
 (
-    `id`               bigint(11)   NOT NULL AUTO_INCREMENT COMMENT 'primary key',
-    `key`              varchar(200) NOT NULL COMMENT 'key, like zookeeper node path',
+    `id`               bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `key`              varchar(256) NOT NULL COMMENT 'key, like zookeeper node path',
     `data`             text         NOT NULL COMMENT 'data, like zookeeper node value',
-    `type`             tinyint(4)   NOT NULL COMMENT '1: ephemeral node, 2: persistent node',
-    `last_update_time` timestamp    NULL COMMENT 'last update time',
-    `create_time`      timestamp    NULL COMMENT 'create time',
+    `type`             tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node',
+    `last_term`        bigint       NOT NULL COMMENT 'last term time',
+    `last_update_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
     PRIMARY KEY (`id`),
     unique (`key`)
 ) ENGINE = InnoDB
@@ -35,12 +36,12 @@ CREATE TABLE `t_ds_mysql_registry_data`
 DROP TABLE IF EXISTS `t_ds_mysql_registry_lock`;
 CREATE TABLE `t_ds_mysql_registry_lock`
 (
-    `id`               bigint(11)   NOT NULL AUTO_INCREMENT COMMENT 'primary key',
-    `key`              varchar(200) NOT NULL COMMENT 'lock path',
-    `lock_owner`       varchar(100) NOT NULL COMMENT 'the lock owner, ip_processId',
-    `last_term`        timestamp    NOT NULL COMMENT 'last term time',
-    `last_update_time` timestamp    NULL COMMENT 'last update time',
-    `create_time`      timestamp    NULL COMMENT 'lock create time',
+    `id`               bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+    `key`              varchar(256) NOT NULL COMMENT 'lock path',
+    `lock_owner`       varchar(256) NOT NULL COMMENT 'the lock owner, ip_processId',
+    `last_term`        bigint       NOT NULL COMMENT 'last term time',
+    `last_update_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
     PRIMARY KEY (`id`),
     unique (`key`)
 ) ENGINE = InnoDB