Browse Source

提交kafka校验

zk 1 year ago
parent
commit
588aa466e6
1 changed files with 218 additions and 0 deletions
  1. 218 0
      src/main/java/com/citygis/service/KafkaJianKongTest.java

+ 218 - 0
src/main/java/com/citygis/service/KafkaJianKongTest.java

@@ -0,0 +1,218 @@
+package com.citygis.service;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hadoop.util.hash.Hash;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.TopicPartition;
+
+import java.sql.*;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.Date;
+
+class KafkaJianKongTest {
+    private static final String TOPIC = "test_topic666666";
+    private static final String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
+
+    public static void main(String[] args) throws SQLException, ClassNotFoundException {
+
+//        producer();
+//        testQuery500w();
+//        testQueryfei100w();
+//        testQueryfei500w();
+//        consumer();
+        dataAnalysis();
+//        Class.forName("dm.jdbc.driver.DmDriver");
+//        String jdbcUrl2 = "jdbc:dm://127.0.0.1:5236/test?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8";
+//        Connection conn2 = DriverManager.getConnection(jdbcUrl2, "SYSDBA", "SYSDBA");
+//
+//        PreparedStatement ps_dms = conn2.prepareStatement("SELECT * FROM \"test\".\"TABLE_1\" order by daySize desc LIMIT ?,?  ");
+//        ps_dms.setInt(1, 0);
+//        ps_dms.setInt(2, 1);
+//        ResultSet rs_dms = ps_dms.executeQuery();
+//        String data="";
+//        while (rs_dms.next()) {
+//            data = rs_dms.getString("sql");
+//        }
+//        System.out.println(data);
+//        rs_dms.close();
+//        ps_dms.close();
+//        conn2.close();
+    }
+
+    static void producer() {
+        // 生产者示例
+        Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+
+        Producer<String, String> producer = new KafkaProducer<>(producerProps);
+
+        String message = "{\n" +
+                "\"tablename\":\"T_C5_TJXXB\",\n" +
+                "\"orgSql\": \"INSERT INTO sales_info VALUES ('2022-01-01', 'product1', 100), ('2022-01-01', 'product2', 200), ('2022-01-02', 'product3', 300);\",\n" +
+                "\"sourceType\":\"sqlserver\",\n" +
+                "\"firest\":\"onlyKey(确保唯一的字符串类型)\",\n" +
+                "\"last\":\"onlyKey(确保唯一的字符串类型)\",\n" +
+                "\"onlyKey\":\"识别传输本条唯一、可以数据源+topic+时间+雪花算法等设定,后续可给出具体项\",\n" +
+                "\"listSize\":\"1\",\n" +
+                "\"listNumber\":\"批次号唯一值\",\n" +
+                "\"daySize\":\"10\",\n" +
+                "\"sendTime\":\"2024-03-22 9:00:00\",\n" +
+                "\"data\":[{ \"field\":\"JHBLGLH,YX,TTPYBLB,TTPJCRQ,TTPFS,KSRSJG,YSRSJG,TPYYBLB,TPYJCRQ,PYJG,DATATIME,OPTION_TYPE\",\n" +
+                "\"fieldtype\":\"varchar,number,varchar,date,varchar,varchar,varchar,varchar,date,varchar,DATETIME,VARCHAR2\",\n" +
+                "\"values\":\"11,1,1,2021-04-08,1,1,1,1,2021-04-08 00:00:00,1,2021-03-01 00:00:00,0\",\n" +
+                "\"primary\":\"JHBLGLH,PYJG\",\n" +
+                "\"where\":\"1,1\"\n" +
+                "}" +
+                "]\n" +
+                "}";
+
+        JSONObject jsonObject = JSON.parseObject(message);
+
+
+        System.out.println("开始时间" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new Date()));
+
+        int k = 0;
+        for (int i = 1; i <= 10; i++) {
+            String listNumber = "批次-" + i;
+            jsonObject.put("listNumber", listNumber.hashCode());
+            for (int j = 1; j <= 100; j++) {
+                String firest = "批次-" + i + "本条ID-" + 1;
+                String last = "批次-" + i + "本条ID-" + 100;
+                String onlyKey = "批次-" + i + "本条ID-" + j;
+                jsonObject.put("firest", firest.hashCode());
+                jsonObject.put("last", last.hashCode());
+                jsonObject.put("onlyKey", onlyKey.hashCode());
+                jsonObject.put("daySize", k++);
+                jsonObject.put("listSize", 100);
+                System.out.println(jsonObject.toString());
+                ProducerRecord<String, String> records = new ProducerRecord<>(TOPIC, jsonObject.toString());
+                producer.send(records, new Callback() {
+                    @Override
+                    public void onCompletion(RecordMetadata metadata, Exception exception) {
+                        if (exception != null) {
+                            System.err.println("Failed to send message: " + exception.getMessage());
+                        }
+                    }
+                });
+            }
+        }
+        producer.close();
+        System.out.println("结束时间" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new Date()));
+    }
+
+
+    static void consumer() throws SQLException, ClassNotFoundException {
+        // 消费者示例
+        Properties consumerProps = new Properties();
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "200");
+//        //latest收最新的数据 none会报错 earliest最早的数据
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "G011");
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+//        consumerProps.put("isolation.level", "read_committed"); // 设置为read_committed来保证消息顺序性
+        Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Collections.singletonList(TOPIC));
+        int count = 0;
+        Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
+        while (true) {
+            JSONArray jsonArray = new JSONArray();
+            ConsumerRecords<String, String> records = consumer.poll(100);
+            for (ConsumerRecord<String, String> record : records) {
+                JSONObject jsonObject = JSON.parseObject(record.value());
+                jsonArray.add(jsonObject);
+                System.out.printf("topic =" + record.topic() + " offset = " + record.offset() + " 消息体:" + jsonObject.toString() + "\n");
+                currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
+                if (count % 1000 == 0)
+                    consumer.commitAsync(currentOffsets, null);
+                count++;
+            }
+           saveData(jsonArray); }}
+
+
+    public static boolean saveData(JSONArray jsonArray) throws ClassNotFoundException, SQLException {
+
+        Class.forName("dm.jdbc.driver.DmDriver");
+        String jdbcUrl2 = "jdbc:dm://127.0.0.1:5236/test?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8";
+        Connection conn2 = DriverManager.getConnection(jdbcUrl2, "SYSDBA", "SYSDBA");
+
+        PreparedStatement ps = conn2.prepareStatement("insert into \"test\".\"TABLE_1\" values (?,?,?,?)");
+
+        for (Object jsonObject : jsonArray) {
+            JSONObject jsonObject1 = (JSONObject) jsonObject;
+            ps.setDate(1, new java.sql.Date(new Date().getTime()));
+            ps.setString(2, jsonObject1.toString());
+            ps.setString(3, jsonObject1.getString("daySize"));
+            ps.setString(4, jsonObject1.getString("listNumber"));
+            ps.addBatch();
+        }
+        boolean state=false;
+        int[] execute = ps.executeBatch();
+        if(execute.length>0)
+        {
+            System.out.println("处理成功情况:"+execute.length);
+            state=true;
+        }
+        ps.close();
+        conn2.close();
+        return state;
+    }
+
+    public static void dataAnalysis() throws ClassNotFoundException, SQLException {
+        Class.forName("dm.jdbc.driver.DmDriver");
+        String jdbcUrl2 = "jdbc:dm://127.0.0.1:5236/test?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8";
+        Connection conn2 = DriverManager.getConnection(jdbcUrl2, "SYSDBA", "SYSDBA");
+
+        PreparedStatement ps_dm = conn2.prepareStatement("SELECT count(1) FROM \"test\".\"TABLE_1\"");
+        ResultSet rs_dm = ps_dm.executeQuery();
+
+        int total = 0;
+        while (rs_dm.next()) {
+            total = (int) rs_dm.getLong(1);
+        }
+        rs_dm.close();
+        System.out.println("总数据量:" + total);
+        ps_dm.close();
+        PreparedStatement ps_dms = conn2.prepareStatement("SELECT * FROM \"test\".\"TABLE_1\" order by daySize desc LIMIT ?,?  ");
+        ps_dms.setInt(1, 0);
+        ps_dms.setInt(2, 1);
+        ResultSet rs_dms = ps_dms.executeQuery();
+
+        String data = "";
+        while (rs_dms.next()) {
+            data = rs_dms.getString("sql");
+        }
+        JSONObject jsonObject = new JSONObject();
+        if (ObjectUtils.isNotEmpty(data)) {
+            jsonObject = JSON.parseObject(data);
+        }
+
+        Integer dataSize = jsonObject.getInteger("daySize");  //获取前天最新数据量
+        Integer listSize = jsonObject.getInteger("listSize");  //获取前天最新数据量
+        System.out.println("获取数据总条目对比:数据自带共计:" + (dataSize+1) + " 获取条目共计:" + total);
+        rs_dm.close();
+        ps_dms.close();
+
+            PreparedStatement ps_list = conn2.prepareStatement("select \"listNumber\" ,count(1) as countSize  from  \"test\".\"TABLE_1\" group by \"listNumber\"");
+            ResultSet rs_dmss = ps_list.executeQuery();
+
+        while (rs_dmss.next()) {
+            Integer size =   rs_dmss.getInt("countSize");
+            System.out.println("批次:+" + rs_dmss.getString("listNumber")+" 批量:"+listSize+"接收存储量:"+size);
+            }
+            System.out.println("结束时间" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new java.util.Date()));
+//        }
+
+
+    }
+}