|
@@ -0,0 +1,122 @@
|
|
|
+package com.citygis.framework.aspectj;
|
|
|
+
|
|
|
+import com.citygis.common.annotation.PushToKafka;
|
|
|
+import com.citygis.common.entity.KafkaEntity;
|
|
|
+import com.citygis.common.utils.StringUtils;
|
|
|
+import io.netty.util.internal.StringUtil;
|
|
|
+import org.apache.kafka.clients.producer.ProducerConfig;
|
|
|
+import org.apache.kafka.common.serialization.StringSerializer;
|
|
|
+import org.aspectj.lang.ProceedingJoinPoint;
|
|
|
+import org.aspectj.lang.annotation.Around;
|
|
|
+import org.aspectj.lang.annotation.Aspect;
|
|
|
+import org.aspectj.lang.reflect.MethodSignature;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
+import org.springframework.kafka.core.ProducerFactory;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.regex.Matcher;
|
|
|
+import java.util.regex.Pattern;
|
|
|
+
|
|
|
+@Aspect
|
|
|
+@Component
|
|
|
+public class KafkaPushAspect {
|
|
|
+
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(KafkaPushAspect.class);
|
|
|
+
|
|
|
+ private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
+
|
|
|
+ @Around("@annotation(pushToKafka)")
|
|
|
+ public void doPushToKafka(ProceedingJoinPoint joinPoint, PushToKafka pushToKafka) throws Throwable {
|
|
|
+ System.out.println("dddddd");
|
|
|
+ try {
|
|
|
+ // 调用处理函数,将执行结果作为参数传递
|
|
|
+ handleSendMessage(joinPoint.proceed(), pushToKafka);
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 处理异常
|
|
|
+ log.error("Error occurred during method execution: {}", e.getMessage());
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理发送消息到Kafka执行流程
|
|
|
+ * @param result 切点
|
|
|
+ * @param pushToKafka 注解
|
|
|
+ */
|
|
|
+ protected void handleSendMessage(final Object result, PushToKafka pushToKafka) {
|
|
|
+ // 校验目标方法传参的数据类型
|
|
|
+ if (!(result instanceof KafkaEntity)) {
|
|
|
+ log.warn("Invalid result type: {}", result.getClass().getName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ String kafkaHost = replaceValidation(pushToKafka.host(), ((KafkaEntity) result).getBootstrapAddress());
|
|
|
+ String kafkaPort = replaceValidation(pushToKafka.port(), ((KafkaEntity) result).getPort());
|
|
|
+ String kafkaTopic = replaceValidation(pushToKafka.topic(), ((KafkaEntity) result).getTopic());
|
|
|
+ String kafkaKey = replaceValidation(pushToKafka.key(), ((KafkaEntity) result).getKey());
|
|
|
+ String kafkaValue = replaceValidation(pushToKafka.value(), ((KafkaEntity) result).getValue());
|
|
|
+
|
|
|
+ // kafka参数异常处理
|
|
|
+ if (
|
|
|
+ StringUtils.isEmpty(kafkaHost) || StringUtils.isEmpty(kafkaPort) ||
|
|
|
+ StringUtils.isEmpty(kafkaTopic) || StringUtils.isEmpty(kafkaKey) || StringUtils.isEmpty(kafkaValue)
|
|
|
+ ) {
|
|
|
+ log.warn("Invalid Kafka configuration: topic or host is empty");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ int kafkaPortInt;
|
|
|
+ try {
|
|
|
+ kafkaPortInt = Integer.parseInt(kafkaPort);
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ log.error("Invalid Kafka port format: {}", kafkaPort);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 配置 KafkaTemplate
|
|
|
+ configureKafkaTemplate(kafkaHost, kafkaPortInt);
|
|
|
+
|
|
|
+ // 推送消息到kafka
|
|
|
+ try {
|
|
|
+ kafkaTemplate.send(kafkaTopic, kafkaKey, kafkaValue);
|
|
|
+ log.info("Pushed to Kafka: Topic={}, Key={}, Value={}", kafkaTopic, kafkaKey, kafkaValue);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error occurred while pushing to Kafka: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 变量替换
|
|
|
+ * @param initialValue 原始值
|
|
|
+ * @param compareValue 比对值
|
|
|
+ * @return 解析后的值
|
|
|
+ */
|
|
|
+ private String replaceValidation(String initialValue, String compareValue) {
|
|
|
+ if (compareValue != null && !compareValue.isEmpty()) {
|
|
|
+ return compareValue;
|
|
|
+ }
|
|
|
+ return initialValue;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 配置KafkaTemplate
|
|
|
+ * @param kafkaHost Kafka主机地址
|
|
|
+ * @param kafkaPort Kafka端口号
|
|
|
+ */
|
|
|
+ private void configureKafkaTemplate(String kafkaHost, int kafkaPort) {
|
|
|
+ Map<String, Object> configProps = new HashMap<>();
|
|
|
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort);
|
|
|
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
|
|
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
|
|
+
|
|
|
+ ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
|
|
|
+ kafkaTemplate = new KafkaTemplate<>(producerFactory);
|
|
|
+ }
|
|
|
+
|
|
|
+}
|