Browse Source

添加kafka数据生成

zk 8 months ago
parent
commit
a7671f770d
2 changed files with 70 additions and 0 deletions
  1. 5 0
      pom.xml
  2. 65 0
      src/main/java/com/citygis/service/KafkaTest.java

+ 5 - 0
pom.xml

@@ -74,6 +74,11 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>2.8.0</version>
+        </dependency>
 
     </dependencies>
 

+ 65 - 0
src/main/java/com/citygis/service/KafkaTest.java

@@ -0,0 +1,65 @@
+package com.citygis.service;
+
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.clients.consumer.*;
+
+import java.util.Collections;
+import java.util.Properties;
+ class KafkaTest {
+    private static final String TOPIC = "test-topic-500w";
+    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+
+    public static void main(String[] args) {
+
+        producer();
+
+    }
+
+     static void   producer(){
+         // 生产者示例
+         Properties producerProps = new Properties();
+         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+
+         Producer<String, String> producer = new KafkaProducer<>(producerProps);
+
+         String message = "Hello, Kafka!";
+         ProducerRecord<String, String> records = new ProducerRecord<>(TOPIC, message);
+
+         for (int i=1;i<=5000000;i++) {
+             int finalI = i;
+             producer.send(records, new Callback() {
+                 @Override
+                 public void onCompletion(RecordMetadata metadata, Exception exception) {
+                     if (exception != null) {
+                         System.err.println("Failed to send message: " + exception.getMessage());
+                     } else {
+                         System.out.println("Sent message: topic="+TOPIC+", offset=%d%n,%n"+ finalI);
+                     }
+                 }
+             });
+         }
+         producer.close();
+     }
+
+
+    static void consumer()
+    {
+        // 消费者示例
+        Properties consumerProps = new Properties();
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Collections.singletonList(TOPIC));
+        while (true) {
+            ConsumerRecords<String, String> recordss = consumer.poll(100);
+            for (ConsumerRecord<String, String> record : recordss) {
+                System.out.printf("Received message: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
+                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
+            }
+        }
+    }
+}