0

有两个kafka主题

  • 消息
  • 图片

新闻主题中的消息可以具有以下图像 ID 列表

{ 

    "id": "news-1",
    "title": "Title news-1",
    "description": " description news-1",
    "author": " Author news-1",
    "imageIds": [
        "images-1",
        "images-2"
    ]
}

图片主题中的消息如下所示

{

    "id": "image-1",
    "url": "https://www.mypublication.co.uk/image-title-1.jpeg",
    "description": "title-1 description",
    "height": 400,
    "width": 450
}


{

     "id": "image-2",
     "url": "https://www.mypublication.co.uk/image-title-2.jpeg",
     "description": "title-2 description",
     "height": 400,
     "width": 450
 }

我正在尝试加入这两个流以填充包含所有图像细节的最终新闻消息。

我尝试使用groupBy聚合如下

 KTable<String, Image> images = builder.table(topics.getImagesTopic(), Consumed.with(Serdes.String(), imageSerde));
    KStream<String, News> news = builder.stream(topics.getNewsTopic(), Consumed.with(Serdes.String(), newsSerde));

KTable<String, NewsImages> newsImagesKTable = news.flatMapValues(newsArticle -> newsArticle.getImageIds())
      .map((newsId, imageId) -> new KeyValue<>(imageId, newsId)) // rekey not good !!?
      .join(images, (newsId, image) -> {
        return new ImageWrapper(newsId, image);
      }, Joined.with(Serdes.String(), Serdes.String(), imageSerde))
            .groupBy((imageId, imageWrapper) -> imageWrapper.getNewsId(), Grouped.with(Serdes.String(), imageWrapperSerde))
      .aggregate(NewsImages::new, (newsId, image, newsImages) -> {
        newsImages.setNewsId(newsId);
        newsImages.addImage(image);
        return newsImages;
      }, Materialized.with(Serdes.String(),newsImagesSerde));

newsImagesKTable.toStream().
      to(topics.getNewsImagesTopic());

但正如预期的那样,上面的代码聚合了新闻的所有图像

当作者第一次用两张图片发布新闻时,一切顺利,我们可以看到下面的输出

 "news-1" :
     {
       "newsId":"news-1", 
"images":
    {"image-1":{"id":"image-1","url":"https://www.mypublication.co.uk/image-1.jpeg","description":"title-1 description","height":400,"width":450},
        "image-2":{"id":"image-2","url":"https://www.mypublication.co.uk/image-2.jpeg","description":"title-2 description","height":400,"width":450}}
                }

当作者重新发布只有 image-3 的文章时,它正在输出所有三个图像(这就是聚合器) news-1 :[image-1,image-2,image-3]

我正在寻找任何其他替代方法来加入新闻和图像,并在重新发布新闻 news-1 时覆盖以前的值:[image-3]

4

1 回答 1

0

从 Apache Kafka 2.4 开始,FK-joins 是原生 DSL 运算符:https ://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#ktable-ktable-foreign-key-join

有关更多详细信息,请查看https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#ktable-ktable-foreign-key-join

于 2021-12-05T21:35:46.370 回答