zyl 11 months ago
commit
7a97a595b1

+ 33 - 0
.gitignore

@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/

+ 120 - 0
pom.xml

@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.6.13</version>
+        <relativePath/> <!-- lookup parent from repository -->
+    </parent>
+    <groupId>com.citygis</groupId>
+    <artifactId>cdc_kafka_consumer</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>cdc_kafka_consumer</name>
+    <description>Demo project for Spring Boot</description>
+    <properties>
+        <java.version>1.8</java.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-tomcat</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.3</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>2.0.5</version>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
+        <dependency>
+            <groupId>io.springfox</groupId>
+            <artifactId>springfox-swagger2</artifactId>
+            <version>2.8.0</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui -->
+        <dependency>
+            <groupId>io.springfox</groupId>
+            <artifactId>springfox-swagger-ui</artifactId>
+            <version>2.8.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.microsoft.sqlserver</groupId>
+            <artifactId>sqljdbc4</artifactId>
+            <version>4.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mybatis.spring.boot</groupId>
+            <artifactId>mybatis-spring-boot-starter</artifactId>
+            <version>2.1.4</version>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.12</version>
+            <scope>provided</scope>
+        </dependency>
+
+
+        <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.1.22</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <version>1.61</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <configuration>
+                    <includeSystemScope>true</includeSystemScope>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 18 - 0
src/main/java/com/citygis/CdcKafkaConsumerApplication.java

@@ -0,0 +1,18 @@
+package com.citygis;
+
+//import org.mybatis.spring.annotation.MapperScan;
+import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@SpringBootApplication
+@EnableScheduling
+@MapperScan("com.citygis.mapper")
+public class CdcKafkaConsumerApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(CdcKafkaConsumerApplication.class, args);
+    }
+
+}

+ 68 - 0
src/main/java/com/citygis/config/KafkaConsumerConfig.java

@@ -0,0 +1,68 @@
+package com.citygis.config;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * kafka消费180
+ */
+@Configuration
+@EnableKafka
+public class KafkaConsumerConfig {
+
+    @Bean
+    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
+        ConcurrentKafkaListenerContainerFactory<Integer, String>
+                factory = new ConcurrentKafkaListenerContainerFactory<>();
+        // 设置消费者工厂
+        factory.setConsumerFactory(consumerFactory());
+        // 消费者组中线程数量
+        factory.setConcurrency(3);
+        // 拉取超时时间
+        factory.getContainerProperties().setPollTimeout(3000);
+        return factory;
+    }
+
+    @Bean
+    public ConsumerFactory<Integer, String> consumerFactory() {
+        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
+    }
+
+    @Bean
+    public Map<String, Object> consumerConfigs() {
+        Map<String, Object> propsMap = new HashMap<>();
+        // Kafka地址
+        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.27.128.180:9000");
+        // 是否自动提交offset偏移量(默认true)
+        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+        // 自动提交的频率(ms)
+        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+        // Session超时设置
+        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "25000");
+        //心跳间隔时间
+        propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "9000");
+
+        // 键的反序列化方式
+        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        // 值的反序列化方式
+        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // offset偏移量规则设置:
+        // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
+        // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
+        // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
+        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return propsMap;
+    }
+
+}

+ 75 - 0
src/main/java/com/citygis/config/KafkaProducerConfig.java

@@ -0,0 +1,75 @@
+package com.citygis.config;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * kafka生产195
+ */
+@Configuration
+@EnableKafka
+public class KafkaProducerConfig {
+
+    private Integer batchSizeConfig;
+    private Integer bufferMemoRyConfig;
+    private Integer maxRequestSizeConfig;
+
+    /**
+     * Producer Template 配置
+     */
+    @Bean(name="kafkaTemplate")
+    public KafkaTemplate<String, String> kafkaTemplate() {
+        return new KafkaTemplate<>(producerFactory());
+    }
+
+    /**
+     * Producer 工厂配置
+     */
+    public ProducerFactory<String, String> producerFactory() {
+        return new DefaultKafkaProducerFactory<>(producerConfigs());
+    }
+
+    /**
+     * Producer 参数配置
+     */
+    public Map<String, Object> producerConfigs() {
+        Map<String, Object> props = new HashMap<>();
+        // 指定多个kafka集群多个地址
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"195.1.21.59:9092");
+        // 重试次数,0为不启用重试机制
+        props.put(ProducerConfig.RETRIES_CONFIG, 3);
+        // acks=0 把消息发送到kafka就认为发送成功
+        // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
+        // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
+        props.put(ProducerConfig.ACKS_CONFIG,"1");
+        // 生产者空间不足时,send()被阻塞的时间,默认60s
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
+        // 控制批处理大小,单位为字节
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576);
+        // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
+        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 52428800);
+        // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
+        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,10485760);
+        // 键的序列化方式
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        // 值的序列化方式
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
+        // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
+        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
+
+        return props;
+    }
+
+}

+ 38 - 0
src/main/java/com/citygis/config/KafkaTopicConfig.java

@@ -0,0 +1,38 @@
+package com.citygis.config;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaAdmin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * kafka生产195
+ */
+@Configuration
+public class KafkaTopicConfig {
+
+    /**
+     * 定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
+     */
+    @Bean
+    public KafkaAdmin kafkaAdmin() {
+        Map<String, Object> configs = new HashMap<>();
+        // 指定多个kafka集群多个地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092
+        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"195.1.21.59:9092");
+        return new KafkaAdmin(configs);
+    }
+
+//    /**
+//     * 创建 Topic
+//     */
+//    @Bean
+//    public NewTopic topicinfo() {
+//        // 创建topic,需要指定创建的topic的"名称"、"分区数"、"副本数量(副本数数目设置要小于Broker数量)"
+//        return new NewTopic("cdc", 3, (short) 0);
+//    }
+
+}

+ 34 - 0
src/main/java/com/citygis/config/RedisConfig.java

@@ -0,0 +1,34 @@
+package com.citygis.config;
+
+import com.alibaba.fastjson.JSON;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.cache.CacheManager;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.cache.RedisCacheConfiguration;
+import org.springframework.data.redis.cache.RedisCacheManager;
+import org.springframework.data.redis.cache.RedisCacheWriter;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.*;
+
+import java.time.Duration;
+
+@Configuration
+public class RedisConfig {
+
+
+    @Bean
+    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
+        RedisTemplate<String, Object> template = new RedisTemplate<>();
+        template.setConnectionFactory(connectionFactory);
+        template.setKeySerializer(new StringRedisSerializer());
+        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
+        return template;
+    }
+
+
+}
+

+ 70 - 0
src/main/java/com/citygis/config/VisiableThreadPoolTaskExecutor.java

@@ -0,0 +1,70 @@
+package com.citygis.config;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * kafka消费
+ */
+
+/**
+ * @author zyl
+ * @date 2023-02-24 13:43
+ */
+public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
+    public static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
+
+    private void showThreadPoolInfo(String prefix) {
+        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
+
+        if (null == threadPoolExecutor) {
+            return;
+        }
+        logger.info("{" + this.getThreadNamePrefix() + "}, {" + prefix + "},taskCount [" + threadPoolExecutor.getTaskCount() + "], completedTaskCount [" + threadPoolExecutor.getCompletedTaskCount() + "], activeCount [" + threadPoolExecutor.getActiveCount() + "], queueSize [" + threadPoolExecutor.getQueue().size() + "]");
+        // System.out.println("{"+this.getThreadNamePrefix()+"}, {"+prefix+"},taskCount ["+ threadPoolExecutor.getTaskCount()+"], completedTaskCount ["+threadPoolExecutor.getCompletedTaskCount()+"], activeCount ["+threadPoolExecutor.getActiveCount()+"], queueSize ["+threadPoolExecutor.getQueue().size()+"]");
+    }
+
+    @Override
+    public void execute(Runnable task) {
+        showThreadPoolInfo("1. do execute");
+        super.execute(task);
+    }
+
+    @Override
+    public void execute(Runnable task, long startTimeout) {
+        showThreadPoolInfo("2. do execute");
+        super.execute(task, startTimeout);
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        showThreadPoolInfo("1. do submit");
+        return super.submit(task);
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        showThreadPoolInfo("2. do submit");
+        return super.submit(task);
+    }
+
+    @Override
+    public ListenableFuture<?> submitListenable(Runnable task) {
+        showThreadPoolInfo("1. do submitListenable");
+        return super.submitListenable(task);
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
+        showThreadPoolInfo("2. do submitListenable");
+        return super.submitListenable(task);
+    }
+
+}

+ 22 - 0
src/main/java/com/citygis/controller/TestController.java

@@ -0,0 +1,22 @@
+package com.citygis.controller;
+
+import org.springframework.web.bind.annotation.CrossOrigin;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @Package com.citygis.controller
+ * @Author zyl
+ * @Date 2024/1/15 0015 14:12
+ * @description: 测试
+ */
+@RestController
+@CrossOrigin
+public class TestController {
+
+    @RequestMapping("/testKafkaConsumer")
+    public void test(){
+        System.out.println("验证成功!");
+    }
+
+}

+ 312 - 0
src/main/java/com/citygis/impl/KafkaConsumerService.java

@@ -0,0 +1,312 @@
+package com.citygis.impl;
+
+import com.citygis.utils.DataUtil;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.text.SimpleDateFormat;
+
+import java.util.Date;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+@Service
+public class KafkaConsumerService {
+
+    @Resource
+    DataUtil dataUtil;
+
+
+    @Autowired
+    private KafkaProducerService producerService;
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_jyxx"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_jyxx(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        //格式化时间
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_jyxx_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_jyxx", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_sccy"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_sccy(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_sccy_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_sccy", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_mbdj"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_mbdj(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_mbdj_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_mbdj", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_mbdj_jzs"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_mbdj_jzs(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_mbdj_jzs_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_mbdj_jzs", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_mbsf_fyxx"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_mbsf_fyxx(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_mbsf_fyxx_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_mbsf_fyxx", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_mbsf_nczgw_jbs"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_mbsf_nczgw_jbs(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_mbsf_nczgw_jbs_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_mbsf_nczgw_jbs", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_mbsfk"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_mbsfk(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_mbsfk_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_mbsfk", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_tnbscdj"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_tnbscdj(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_tnbscdj_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_tnbscdj", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_tzcl"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_tzcl(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_tzcl_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_tzcl", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_zg"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_zg(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_zg_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_zg", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_tb_sfjh"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_tb_sfjh(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_tb_sfjh_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_tb_sfjh", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_tb_xt_deadcard"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_tb_xtdeadcard(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_tb_xt_deadcard_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_tb_xt_deadcard", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_tb_xt_gldxjbxx"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_tb_xt_gloxjbxx(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_tb_xt_gldxjbxx_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_tb_xt_gldxjbxx", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_xt_sczzjg"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_xt_sczzjg(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_xt_sczzjg_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_xt_sczzjg", dateFormat, message);
+    }
+
+    @KafkaListener(topics = {"cdcmb_cdc_fr_gxy_alert"}, groupId = "group1", containerFactory = "kafkaListenerContainerFactory")
+    public void kafkaListenercdcmb_cdc_fr_gxy_alert(String message) throws  ExecutionException, InterruptedException, TimeoutException {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String dateFormat = format.format(new Date());
+
+
+        //截取年月日 每天生成不同的redis key
+        String splitData = dateFormat.split(" ")[0];
+
+        // 设置rediskey
+        String key = splitData + "_cdcmb_cdc_fr_gxy_alert_XF";
+
+        //保存消费量到redis
+        dataUtil.saveRedis(key);
+
+        producerService.sendMessageSync("cdcmb_cdc_fr_gxy_alert", dateFormat, message);
+    }
+
+
+}

+ 77 - 0
src/main/java/com/citygis/impl/KafkaProducerService.java

@@ -0,0 +1,77 @@
+package com.citygis.impl;
+
+import com.citygis.utils.DataUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Service;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import javax.annotation.Resource;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+@Service
+public class KafkaProducerService {
+
+    @Autowired
+    private KafkaTemplate kafkaTemplate;
+
+    @Resource
+    DataUtil dataUtil;
+
+
+    /**
+     * producer 同步方式发送数据
+     *
+     * @param topic   topic名称
+     * @param message producer发送的数据
+     */
+    public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
+
+        try {       //格式化时间
+            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+            String dateFormat = format.format(new Date());
+
+            //截取年月日 每天生成不同的redis key
+            String splitData = dateFormat.split(" ")[0];
+
+            // 设置rediskey
+            String redisKey = splitData + "_" + topic + "_FS";
+
+            // 消息发送成功
+            //保存发送量到redis
+            dataUtil.saveRedis(redisKey);
+
+            kafkaTemplate.send(topic, key, message);
+        } catch (Exception e) {
+            e.getMessage();
+        }
+
+    }
+
+    /**
+     * producer 异步方式发送数据
+     *
+     * @param topic   topic名称
+     * @param message producer发送的数据
+     */
+    public void sendMessageAsync(String topic, String message) {
+        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message);
+        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
+            @Override
+            public void onSuccess(SendResult<Integer, String> result) {
+                System.out.println("success");
+            }
+
+            @Override
+            public void onFailure(Throwable ex) {
+                System.out.println("failure");
+            }
+        });
+    }
+
+}

+ 19 - 0
src/main/java/com/citygis/impl/KafkaServiceImpl.java

@@ -0,0 +1,19 @@
+package com.citygis.impl;
+
+import com.citygis.mapper.KafkaMapper;
+import com.citygis.pojo.Para;
+import com.citygis.service.KafkaService;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+@Service
+public class KafkaServiceImpl implements KafkaService {
+
+    @Resource
+    KafkaMapper kafkaMapper;
+    @Override
+    public int addKafkaCount(Para para) {
+        return kafkaMapper.addKafkaCount(para);
+    }
+}

+ 11 - 0
src/main/java/com/citygis/mapper/KafkaMapper.java

@@ -0,0 +1,11 @@
+package com.citygis.mapper;
+
+import com.citygis.pojo.Para;
+import org.apache.ibatis.annotations.Mapper;
+
+@Mapper
+public interface KafkaMapper {
+
+    int addKafkaCount(Para para);
+
+}

+ 17 - 0
src/main/java/com/citygis/pojo/Para.java

@@ -0,0 +1,17 @@
+package com.citygis.pojo;
+
+import lombok.Data;
+
+/**
+ * @author zyl
+ * @date 2023-05-18 10:38
+ */
+@Data
+public class Para {
+
+    private String sendCount;
+    private String receiveCount;
+    private String time;
+    private String tabName;
+
+}

+ 13 - 0
src/main/java/com/citygis/service/KafkaService.java

@@ -0,0 +1,13 @@
+package com.citygis.service;
+
+import com.citygis.pojo.Para;
+
+/**
+ * @author zyl
+ * @date 2023-02-24 9:32
+ */
+public interface KafkaService {
+
+    int addKafkaCount(Para para);
+
+}

+ 22 - 0
src/main/java/com/citygis/test/test.java

@@ -0,0 +1,22 @@
+package com.citygis.test;
+
+import com.citygis.utils.DataUtil;
+import org.springframework.web.bind.annotation.CrossOrigin;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+
+@RestController
+@CrossOrigin
+public class test {
+
+    @Resource
+    DataUtil dataUtil;
+
+    @RequestMapping("/saveData")
+    public void test(){
+        dataUtil.saveDataBaseByKafkaData();
+    }
+
+}

+ 124 - 0
src/main/java/com/citygis/utils/DataUtil.java

@@ -0,0 +1,124 @@
+package com.citygis.utils;
+
+import com.citygis.pojo.Para;
+import com.citygis.service.KafkaService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.time.LocalDate;
+import java.time.YearMonth;
+
+@Service
+public class DataUtil {
+
+    @Resource
+    KafkaService kafkaService;
+
+
+    @Resource
+    private RedisTemplate<String, Object> redisTemplate;
+
+    //15张慢病表
+    static String[] tableNemes = {"cdcmb_cdc_fr_jyxx",
+            "cdcmb_cdc_fr_sccy",
+            "cdcmb_cdc_fr_mbdj",
+            "cdcmb_cdc_fr_mbdj_jzs",
+            "cdcmb_cdc_fr_mbsf_fyxx",
+            "cdcmb_cdc_fr_mbsf_nczgw_jbs",
+            "cdcmb_cdc_fr_mbsfk",
+            "cdcmb_cdc_fr_tnbscdj",
+            "cdcmb_cdc_fr_tzcl",
+            "cdcmb_cdc_fr_zg",
+            "cdcmb_cdc_tb_sfjh",
+            "cdcmb_cdc_tb_xt_deadcard",
+            "cdcmb_cdc_tb_xt_gldxjbxx",
+            "cdcmb_cdc_xt_sczzjg",
+            "cdcmb_cdc_fr_gxy_alert"};
+
+    public void saveRedis(String key) {
+        //判断redis是否存在指定key,有就累加,没有就新建
+        if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
+            Integer value = (int) redisTemplate.opsForValue().get(key);
+            Integer i = value + 1;
+            redisTemplate.opsForValue().set(key, i);
+        } else {
+            redisTemplate.opsForValue().set(key, 1);
+        }
+    }
+
+    /**
+     * 每天凌晨一点往数据库存储昨天的数据
+     */
+    @Scheduled(cron = "0 0 1 * * ?")
+    public void saveDataBaseByKafkaData() {
+        LocalDate currentDate = LocalDate.now(); // 获取当前日期
+        LocalDate yesterday = currentDate.minusDays(1); // 获取昨天的日期
+
+        int year = yesterday.getYear(); // 获取年份
+        int month = yesterday.getMonthValue(); // 获取月份
+        int day = yesterday.getDayOfMonth(); // 获取日
+
+        LocalDate date = LocalDate.of(year, month, day);
+        for (String tableNeme : tableNemes) {
+            String redisKeyXF = date + "_" + tableNeme + "_XF";
+            String redisKeyFS = date + "_" + tableNeme + "_FS";
+//            System.out.println(redisKeyXF);
+//            System.out.println(redisKeyFS);
+
+
+            Object xf = redisTemplate.opsForValue().get(redisKeyXF);
+            Object fs = redisTemplate.opsForValue().get(redisKeyFS);
+
+            int valueXF = 0;
+            int valueFS = 0;
+            if (xf != null) {
+                // 进行拆箱操作
+                valueXF = (int) xf;
+                // 使用intValue进行后续操作
+            }
+            if (fs != null) {
+                valueFS = (int) fs;
+            }
+
+            Para para = new Para();
+            para.setSendCount(String.valueOf(valueFS));
+            para.setReceiveCount(String.valueOf(valueXF));
+            para.setTime(date.toString());
+            para.setTabName(tableNeme);
+            kafkaService.addKafkaCount(para);
+            System.out.println(date+"---定时存储执行");
+        }
+    }
+
+    /**
+     * 每月28号删除上个月的数据
+     */
+    @Scheduled(cron = "0 0 0 28 * ?")
+    public void deleteLastMonthRedisData() {
+
+        YearMonth currentYearMonth = YearMonth.now(); // 获取当前年月
+        YearMonth lastMonth = currentYearMonth.minusMonths(1); // 获取上个月的年月
+
+        //获取昨天的日期
+        int year = lastMonth.getYear(); // 获取年份
+        int month = lastMonth.getMonthValue(); // 获取月份
+        int days = lastMonth.lengthOfMonth(); // 获取当月的天数
+
+        for (int day = 1; day <= days; day++) {
+            LocalDate date = LocalDate.of(year, month, day);
+            //删除上个月redis数据
+            for (String tableNeme : tableNemes) {
+                String redisKeyXF = date + "_" + tableNeme + "_XF";
+                String redisKeyFS = date + "_" + tableNeme + "_FS";
+                //删除key
+                redisTemplate.delete(redisKeyXF);
+                redisTemplate.delete(redisKeyFS);
+            }
+        }
+        System.out.println("定时删除执行");
+    }
+}

+ 64 - 0
src/main/java/com/citygis/utils/SystemMonitor.java

@@ -0,0 +1,64 @@
+package com.citygis.utils;
+import com.sun.management.OperatingSystemMXBean;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.ThreadMXBean;
+import java.net.*;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Enumeration;
+
+
+/**
+ * @Package com.citygis.utils
+ * @Author zyl
+ * @Date 2024/1/22 0022 9:54
+ * @description: 监控网络带宽
+ */
+
+public class SystemMonitor {
+
+    public static void main(String[] args) throws Exception {
+        String host = "example.com"; // 替换为实际的主机名或IP地址
+        int port = 80; // 替换为实际的端口号
+
+        try (Socket socket = new Socket()) {
+            socket.connect(new InetSocketAddress(host, port), 1000); // 连接指定主机和端口,设置超时时间为1秒
+
+            // 发送部分
+            long startSendTime = System.nanoTime();
+            socket.getOutputStream().write(new byte[1024]); // 发送一个1KB大小的数据包
+            socket.getOutputStream().flush();
+            long endSendTime = System.nanoTime();
+
+            // 接收部分
+            long startReceiveTime = System.nanoTime();
+            socket.getInputStream().skip(1024); // 接收一个1KB大小的数据包
+            long endReceiveTime = System.nanoTime();
+
+            Duration sendDuration = Duration.ofNanos(endSendTime - startSendTime);
+            long sendBandwidthBps = (long) (1024.0 / sendDuration.toMillis() * 8 * 1000); // 计算发送带宽(bps)
+            System.out.println("Send bandwidth: " + sendBandwidthBps / 1024 + " KB/s"); // 输出发送带宽(KB/s)
+
+            Duration receiveDuration = Duration.ofNanos(endReceiveTime - startReceiveTime);
+            long receiveBandwidthBps = (long) (1024.0 / receiveDuration.toMillis() * 8 * 1000); // 计算接收带宽(bps)
+            System.out.println("Receive bandwidth: " + receiveBandwidthBps / 1024 + " KB/s"); // 输出接收带宽(KB/s)
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        //监控本地内存
+        OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
+        long totalMemorySize = osBean.getTotalPhysicalMemorySize(); // 获取总物理内存大小
+        long freeMemorySize = osBean.getFreePhysicalMemorySize(); // 获取空闲物理内存大小
+        long usedMemorySize = totalMemorySize - freeMemorySize; // 计算已使用的物理内存大小
+
+        System.out.println("Total memory: " + totalMemorySize / 1024 / 1024 + " MB"); // 输出总物理内存大小(MB)
+        System.out.println("Used memory: " + usedMemorySize / 1024 / 1024 + " MB"); // 输出已使用的物理内存大小(MB)
+    }
+}

+ 47 - 0
src/main/resources/application.yml

@@ -0,0 +1,47 @@
+spring:
+  redis:
+    host: 127.0.0.1
+    port: 6379
+    jedis:
+      pool:
+        max-active: 8  # 最大连接数
+        max-idle: 8  # 最大空闲连接数
+        min-idle: 0  # 最小空闲连接数
+        max-wait: -1  # 最大等待时间(毫秒)
+
+    lettuce:
+      pool:
+        max-active: 8  # 最大连接数
+        max-idle: 8  # 最大空闲连接数
+        min-idle: 0  # 最小空闲连接数
+        max-wait: -1  # 最大等待时间(毫秒)
+
+    timeout: 3000  # 连接超时时间(毫秒)
+    database: 0  # Redis数据库索引(默认为0)
+  datasource:
+    url: jdbc:sqlserver://195.1.20.27:2433;DatabaseName=MIDDB
+    #    url: jdbc:sqlserver://120.55.160.15:1433;DatabaseName=MIDDB
+    driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
+    username: MIDDB
+    password: 1qaz2wsx!
+  servlet:
+    multipart:
+      max-file-size: 1024MB
+      max-request-size: 1024MB
+#  kafka:
+#    producer:
+#      acks:
+mybatis:
+  mapper-locations: classpath*:mapper/*.xml,classpath*:mapper/**/*.xml
+
+#  configuration:
+#    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
+logging:
+  level:
+        org.apache.kafka.common.serialization.StringDeserializer: ERROR
+        org.apache.kafka: ERROR
+
+server:
+  port: 9998
+
+

+ 17 - 0
src/main/resources/mapper/KafkaMapper.xml

@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE mapper
+        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.citygis.mapper.KafkaMapper">
+
+
+    <insert id="addKafkaCount" parameterType="com.citygis.pojo.Para" >
+        INSERT INTO [dbo].[tab_kafkaLog]
+        ([sendCount]
+            ,[receiveCount]
+            ,[time],[tabName])
+        VALUES
+            (#{sendCount},#{receiveCount},#{time},#{tabName})
+    </insert>
+
+</mapper>

+ 6 - 0
src/main/resources/web/WEB-INF/web.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
+         version="4.0">
+</web-app>

+ 13 - 0
src/test/java/com/citygis/CdcKafkaConsumerApplicationTests.java

@@ -0,0 +1,13 @@
+package com.citygis;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class CdcKafkaConsumerApplicationTests {
+
+    @Test
+    void contextLoads() {
+    }
+
+}