0

我有一个用例,我在其中接收有关某个主题的推文,以及有关其他主题的用户详细信息。我需要从用户详细信息中找到用户名并将其设置为推文。使用以下代码,我可以获得预期的结果。

KStream<String, Tweet> tweetStream = builder
                .stream("tweet-topic",
                        Consumed.with(Serdes.String(),
                                serdeProvider.getTweetSerde()));

        KTable<String, User> userTable = builder.table("user-topic",
                Consumed.with(Serdes.String(),
                        serdeProvider.getUserSerde()));

        KStream<String, Tweet> finalStream = tweetStream.leftJoin(userTable, (tweetDetail, userDetail) -> {
            if (userDetail != null) {
                return tweetDetail.setUserName(userDetail.getName());
            }
            return tweetDetail;
        }, Joined.with(Serdes.String(), serdeProvider.getTweetSerde(),
                serdeProvider.getUserSerde()));

但是,如果 kTable 主题中有 1000 条记录,则处理 100 万条此逻辑需要 2 小时以上。之前需要 2 到 3 分钟。

早些时候,当用户详细信息在本地哈希映射中时,处理所有数据大约需要 10 分钟。有没有其他方法可以避免 LeftJoin 或提高其性能?

4

0 回答 0