我正在尝试加入来自两个主题人员和地址的数据,其中一个人可以有多个地址。发布到主题中的数据如下所示:
//person with id as key
{"id": "123", "name": "Tom Tester"}
//addresses with id as key
{"id": "321", "person_id": "123", "address": "Somestreet 12, 4321 Somewhere"}
{"id": "432", "person_id": "123", "address": "Otherstreet 12, 5432 Nowhere"}
加入后,我希望有一个聚合输出(在 elasticsearch 中被索引),它应该看起来像这样:
{
"id": "123",
"name": "Tom Tester",
"addresses": [
{
"id": "321",
"address": "Somestreet 12, 4321 Somewhere"
},
{
"id": "432",
"address": "Otherstreet 12, 5432 Nowhere"
}
]
}
每当人员或地址主题得到更新时,也应更新聚合的人员。目前,我实现了仅在发布地址而不是在更改人员本身时才获取聚合人员的更新。任何想法这段代码有什么问题?
@SpringBootApplication
@EnableBinding(PersonAggregatorBinding.class)
public class KafkaStreamTestApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamTestApplication.class, args);
}
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamTestApplication.class);
@StreamListener
@SendTo("person-aggregation")
public KStream<String, PersonAggregation> process(
@Input("person-input") KTable<String, Person> personInput,
@Input("address-input") KTable<String, Address> addressInput) {
KTable<String, AddressAggregation> addressAggregate = addressInput.toStream()
.peek((key, value) -> LOG.info("addr {}: {}", key, value))
.groupBy((k, v) -> v.getPersonId(), Grouped.with(null, new AddressSerde()))
.aggregate(
AddressAggregation::new,
(key, value, aggregation) -> {
aggregate(aggregation, value);
return aggregation;
}, Materialized.with(Serdes.String(), new AddressAggregationSerde()));
addressAggregate.toStream()
.peek((key, value) -> LOG.info("aggregated addr: {}", value));
return personInput.toStream()
.leftJoin(addressAggregate, this::join, Joined.with(Serdes.String(), new PersonSerde(), new AddressAggregationSerde()))
.peek((key, value) -> LOG.info("aggregated person: {}", value));
}
private PersonAggregation join(Person person, AddressAggregation addrs) {
return PersonAggregation.builder()
.id(person.getId())
.name(person.getName())
.addresses(addrs)
.build();
}
public void aggregate(AddressAggregation aggregation, Address address) {
if(address != null){
aggregation.removeIf(it -> Objects.equals(it.getId(), address.getId()));
if(address.isValid()) {
aggregation.add(address);
}
}
}
}