0

加入两个 KTable 时,我面临空指针异常。


final ValueJoiner<topicA, topicB, user> userJoiner = (topicA, topicB) -> {
            LOG.debug(LOG_PREFIX + "Begin of join of topicA with topicB");
            LOG.trace(LOG_PREFIX + "topicA: {}", topicA);
            LOG.trace(LOG_PREFIX + "topicB: {}", topicB);
            //Construct a new user object after joining
            LOG.debug(LOG_PREFIX + "End of Join");
            LOG.trace(LOG_PREFIX + "user : {}", user_tmp);
            return user_tmp;
        };

        final KTable<String, user> userTable = topicA.join(topicB,
                topicA::getPWDRIDOWNER,
                userJoiner,
                Named.as("topicA-join-with-topicB"),
                Materialized.<String, user, KeyValueStore<Bytes, byte[]>>as("topicA-join-with-topicB").withKeySerde(Serdes.String()).withValueSerde(userSerde).withCachingDisabled());

        final KStream<String, user> userOutputStream = userTable.toStream().selectKey((k,v) -> {
            LOG.debug(LOG_PREFIX + "Selecting key for outgoing record");
            LOG.trace(LOG_PREFIX + "Selecting key for outgoing record ");
            LOG.trace(LOG_PREFIX + "user record : {} ", v);
            return v.getFieldA()+"_"+v.getFieldB()+"_"+v.getFieldC();
        });

当主题 A 中存在记录但主题 B 中没有匹配记录时,会出现空指针。

使用跟踪日志,我可以看到以下日志打印了一个 null

LOG.trace(LOG_PREFIX + "user record : {} ", v);

这可能表明连接没有被触发,但它仍然会随着拓扑的执行而继续。因此,在加入之后,它开始执行“selectKey”节点并运行到 NPE。

据我了解,如果未触发联接,则应在拓扑中等待。

请任何人帮助了解可能出现的问题

4

0 回答 0