有两个kafka主题
- 消息
- 图片
新闻主题中的消息可以具有以下图像 ID 列表
{
"id": "news-1",
"title": "Title news-1",
"description": " description news-1",
"author": " Author news-1",
"imageIds": [
"images-1",
"images-2"
]
}
图片主题中的消息如下所示
{
"id": "image-1",
"url": "https://www.mypublication.co.uk/image-title-1.jpeg",
"description": "title-1 description",
"height": 400,
"width": 450
}
{
"id": "image-2",
"url": "https://www.mypublication.co.uk/image-title-2.jpeg",
"description": "title-2 description",
"height": 400,
"width": 450
}
我正在尝试加入这两个流以填充包含所有图像细节的最终新闻消息。
我尝试使用groupBy和聚合如下
KTable<String, Image> images = builder.table(topics.getImagesTopic(), Consumed.with(Serdes.String(), imageSerde));
KStream<String, News> news = builder.stream(topics.getNewsTopic(), Consumed.with(Serdes.String(), newsSerde));
KTable<String, NewsImages> newsImagesKTable = news.flatMapValues(newsArticle -> newsArticle.getImageIds())
.map((newsId, imageId) -> new KeyValue<>(imageId, newsId)) // rekey not good !!?
.join(images, (newsId, image) -> {
return new ImageWrapper(newsId, image);
}, Joined.with(Serdes.String(), Serdes.String(), imageSerde))
.groupBy((imageId, imageWrapper) -> imageWrapper.getNewsId(), Grouped.with(Serdes.String(), imageWrapperSerde))
.aggregate(NewsImages::new, (newsId, image, newsImages) -> {
newsImages.setNewsId(newsId);
newsImages.addImage(image);
return newsImages;
}, Materialized.with(Serdes.String(),newsImagesSerde));
newsImagesKTable.toStream().
to(topics.getNewsImagesTopic());
但正如预期的那样,上面的代码聚合了新闻的所有图像
当作者第一次用两张图片发布新闻时,一切顺利,我们可以看到下面的输出
"news-1" :
{
"newsId":"news-1",
"images":
{"image-1":{"id":"image-1","url":"https://www.mypublication.co.uk/image-1.jpeg","description":"title-1 description","height":400,"width":450},
"image-2":{"id":"image-2","url":"https://www.mypublication.co.uk/image-2.jpeg","description":"title-2 description","height":400,"width":450}}
}
当作者重新发布只有 image-3 的文章时,它正在输出所有三个图像(这就是聚合器) news-1 :[image-1,image-2,image-3]
我正在寻找任何其他替代方法来加入新闻和图像,并在重新发布新闻 news-1 时覆盖以前的值:[image-3]