0

ItemReader正在从 DB2 读取数据并提供 java 对象ClaimDto。现在ClaimProcessor接收对象ClaimDto并返回CompositeClaimRecord对象,该对象包含claimRecord1并发claimRecord2送到两个不同的 Kafka 主题。如何分别写到topic1claimRecord1claimRecord2topic2。

4

2 回答 2

2

只需编写一个ItemWriter可以做到这一点的自定义。

public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {

  private final ItemWriter<Record1> writer1;
  private final ItemWriter<Record2> writer2;

  public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
    this.writer1=writer1;
    this.writer2=writer2;
}

  public void write(List<CompositeClaimRecord> items) throws Exception {

    for (CompositeClaimRecord record : items) {
       writer1.write(Collections.singletonList(record.claimRecord1));
       writer2.write(Collections.singletonList(record.claimRecord2));

    }
  }
}

或者不是一次写入 1 条记录,而是将单个列表转换为 2 个列表并将其传递。但是这样处理错误可能有点挑战。\

public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {

  private final ItemWriter<Record1> writer1;
  private final ItemWriter<Record2> writer2;

  public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
    this.writer1=writer1;
    this.writer2=writer2;
}

  public void write(List<CompositeClaimRecord> items) throws Exception {

    List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
    List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());

    writer1.write(record1List);
    writer2.write(record2List);


  }
}
于 2020-12-14T10:11:38.697 回答
0

您可以使用带有两个s 作为委托的ClassifierCompositeItemWriterKafkaItemWriter(每个主题一个)。

Classifier根据项目的类型(claimRecord1claimRecord2)对项目进行分类,并将它们路由到相应的 kafka 项目编写器(topic1topic2)。

于 2020-12-14T09:25:03.887 回答