我有一个用例,我在其中接收有关某个主题的推文,以及有关其他主题的用户详细信息。我需要从用户详细信息中找到用户名并将其设置为推文。使用以下代码,我可以获得预期的结果。
KStream<String, Tweet> tweetStream = builder
.stream("tweet-topic",
Consumed.with(Serdes.String(),
serdeProvider.getTweetSerde()));
KTable<String, User> userTable = builder.table("user-topic",
Consumed.with(Serdes.String(),
serdeProvider.getUserSerde()));
KStream<String, Tweet> finalStream = tweetStream.leftJoin(userTable, (tweetDetail, userDetail) -> {
if (userDetail != null) {
return tweetDetail.setUserName(userDetail.getName());
}
return tweetDetail;
}, Joined.with(Serdes.String(), serdeProvider.getTweetSerde(),
serdeProvider.getUserSerde()));
但是,如果 kTable 主题中有 1000 条记录,则处理 100 万条此逻辑需要 2 小时以上。之前需要 2 到 3 分钟。
早些时候,当用户详细信息在本地哈希映射中时,处理所有数据大约需要 10 分钟。有没有其他方法可以避免 LeftJoin 或提高其性能?