ItemReader
正在从 DB2 读取数据并提供 java 对象ClaimDto
。现在ClaimProcessor
接收对象ClaimDto
并返回CompositeClaimRecord
对象,该对象包含claimRecord1
并发claimRecord2
送到两个不同的 Kafka 主题。如何分别写到topic1claimRecord1
和claimRecord2
topic2。
问问题
347 次
2 回答
2
只需编写一个ItemWriter
可以做到这一点的自定义。
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
for (CompositeClaimRecord record : items) {
writer1.write(Collections.singletonList(record.claimRecord1));
writer2.write(Collections.singletonList(record.claimRecord2));
}
}
}
或者不是一次写入 1 条记录,而是将单个列表转换为 2 个列表并将其传递。但是这样处理错误可能有点挑战。\
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());
writer1.write(record1List);
writer2.write(record2List);
}
}
于 2020-12-14T10:11:38.697 回答
0
您可以使用带有两个s 作为委托的ClassifierCompositeItemWriterKafkaItemWriter
(每个主题一个)。
将Classifier
根据项目的类型(claimRecord1
或claimRecord2
)对项目进行分类,并将它们路由到相应的 kafka 项目编写器(topic1
或topic2
)。
于 2020-12-14T09:25:03.887 回答