|
@@ -77,9 +77,27 @@ public class TabDatasourceMsgServiceImpl extends ServiceImpl<TabDatasourceMsgMap
|
|
|
tabKafka.setAccessFeedbackId(dataSun.getAccessFeedbackId());
|
|
|
tabKafkaService.save(tabKafka);
|
|
|
|
|
|
- List<TabKafkaSun> tabKafkaSuns = tabKafka.getTabKafkaSuns();
|
|
|
- if (tabKafkaSuns != null) {
|
|
|
- tabKafkaSuns.forEach(tabKafkaSun -> {
|
|
|
+ List<TabKafkaSun> keySuns = tabKafka.getTabKafkaKeySuns();
|
|
|
+ if (keySuns != null) {
|
|
|
+ keySuns.forEach(tabKafkaSun -> {
|
|
|
+ tabKafkaSun.setId(snowflake.nextId());
|
|
|
+ tabKafkaSun.setKafkaId(kafkaId);
|
|
|
+ tabKafkaSunService.save(tabKafkaSun);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ List<TabKafkaSun> valueSuns = tabKafka.getTabKafkaValueSuns();
|
|
|
+ if (valueSuns != null) {
|
|
|
+ valueSuns.forEach(tabKafkaSun -> {
|
|
|
+ tabKafkaSun.setId(snowflake.nextId());
|
|
|
+ tabKafkaSun.setKafkaId(kafkaId);
|
|
|
+ tabKafkaSunService.save(tabKafkaSun);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ List<TabKafkaSun> dataSuns = tabKafka.getTabKafkaDataSuns();
|
|
|
+ if (dataSuns != null) {
|
|
|
+ dataSuns.forEach(tabKafkaSun -> {
|
|
|
tabKafkaSun.setId(snowflake.nextId());
|
|
|
tabKafkaSun.setKafkaId(kafkaId);
|
|
|
tabKafkaSunService.save(tabKafkaSun);
|
|
@@ -134,7 +152,31 @@ public class TabDatasourceMsgServiceImpl extends ServiceImpl<TabDatasourceMsgMap
|
|
|
tabKafka.setAccessFeedbackId(dataSun.getAccessFeedbackId());
|
|
|
tabKafkaService.saveOrUpdate(tabKafka);
|
|
|
|
|
|
- tabKafka.getTabKafkaSuns().forEach(tabKafkaSun -> {
|
|
|
+ tabKafka.getTabKafkaKeySuns().forEach(tabKafkaSun -> {
|
|
|
+ LambdaQueryWrapper<TabKafkaSun> kql = new LambdaQueryWrapper<>();
|
|
|
+ kql.eq(TabKafkaSun::getId, tabKafkaSun.getId());
|
|
|
+ TabKafkaSun one = tabKafkaSunService.getOne(kql);
|
|
|
+ if (one != null) {
|
|
|
+ tabKafkaSunService.updateById(one);
|
|
|
+ } else {
|
|
|
+ tabKafkaSun.setId(snowflake.nextId());
|
|
|
+ tabKafkaSunService.save(tabKafkaSun);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ tabKafka.getTabKafkaValueSuns().forEach(tabKafkaSun -> {
|
|
|
+ LambdaQueryWrapper<TabKafkaSun> kql = new LambdaQueryWrapper<>();
|
|
|
+ kql.eq(TabKafkaSun::getId, tabKafkaSun.getId());
|
|
|
+ TabKafkaSun one = tabKafkaSunService.getOne(kql);
|
|
|
+ if (one != null) {
|
|
|
+ tabKafkaSunService.updateById(one);
|
|
|
+ } else {
|
|
|
+ tabKafkaSun.setId(snowflake.nextId());
|
|
|
+ tabKafkaSunService.save(tabKafkaSun);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ tabKafka.getTabKafkaDataSuns().forEach(tabKafkaSun -> {
|
|
|
LambdaQueryWrapper<TabKafkaSun> kql = new LambdaQueryWrapper<>();
|
|
|
kql.eq(TabKafkaSun::getId, tabKafkaSun.getId());
|
|
|
TabKafkaSun one = tabKafkaSunService.getOne(kql);
|
|
@@ -221,11 +263,25 @@ public class TabDatasourceMsgServiceImpl extends ServiceImpl<TabDatasourceMsgMap
|
|
|
LambdaQueryWrapper<TabKafka> kql = new LambdaQueryWrapper<>();
|
|
|
kql.eq(TabKafka::getAccessFeedbackId, id);
|
|
|
List<TabKafka> tabKafkas = tabKafkaService.list(kql);
|
|
|
+
|
|
|
tabKafkas.forEach(tabKafka -> {
|
|
|
LambdaQueryWrapper<TabKafkaSun> ks = new LambdaQueryWrapper<>();
|
|
|
ks.eq(TabKafkaSun::getKafkaId, tabKafka.getId());
|
|
|
+ ks.eq(TabKafkaSun::getType, "key");
|
|
|
List<TabKafkaSun> tabKafkaSuns = tabKafkaSunService.list(ks);
|
|
|
- tabKafka.setTabKafkaSuns(tabKafkaSuns);
|
|
|
+ tabKafka.setTabKafkaKeySuns(tabKafkaSuns);
|
|
|
+
|
|
|
+ LambdaQueryWrapper<TabKafkaSun> vs = new LambdaQueryWrapper<>();
|
|
|
+ vs.eq(TabKafkaSun::getKafkaId, tabKafka.getId());
|
|
|
+ vs.eq(TabKafkaSun::getType, "value");
|
|
|
+ List<TabKafkaSun> tabKafkaValueSuns = tabKafkaSunService.list(vs);
|
|
|
+ tabKafka.setTabKafkaValueSuns(tabKafkaValueSuns);
|
|
|
+
|
|
|
+ LambdaQueryWrapper<TabKafkaSun> ds = new LambdaQueryWrapper<>();
|
|
|
+ ds.eq(TabKafkaSun::getKafkaId, tabKafka.getId());
|
|
|
+ ds.eq(TabKafkaSun::getType, "data");
|
|
|
+ List<TabKafkaSun> tabKafkaDataSuns = tabKafkaSunService.list(ds);
|
|
|
+ tabKafka.setTabKafkaDataSuns(tabKafkaDataSuns);
|
|
|
});
|
|
|
|
|
|
|