加入两个 KTable 时,我面临空指针异常。
final ValueJoiner<topicA, topicB, user> userJoiner = (topicA, topicB) -> {
LOG.debug(LOG_PREFIX + "Begin of join of topicA with topicB");
LOG.trace(LOG_PREFIX + "topicA: {}", topicA);
LOG.trace(LOG_PREFIX + "topicB: {}", topicB);
//Construct a new user object after joining
LOG.debug(LOG_PREFIX + "End of Join");
LOG.trace(LOG_PREFIX + "user : {}", user_tmp);
return user_tmp;
};
final KTable<String, user> userTable = topicA.join(topicB,
topicA::getPWDRIDOWNER,
userJoiner,
Named.as("topicA-join-with-topicB"),
Materialized.<String, user, KeyValueStore<Bytes, byte[]>>as("topicA-join-with-topicB").withKeySerde(Serdes.String()).withValueSerde(userSerde).withCachingDisabled());
final KStream<String, user> userOutputStream = userTable.toStream().selectKey((k,v) -> {
LOG.debug(LOG_PREFIX + "Selecting key for outgoing record");
LOG.trace(LOG_PREFIX + "Selecting key for outgoing record ");
LOG.trace(LOG_PREFIX + "user record : {} ", v);
return v.getFieldA()+"_"+v.getFieldB()+"_"+v.getFieldC();
});
当主题 A 中存在记录但主题 B 中没有匹配记录时,会出现空指针。
使用跟踪日志,我可以看到以下日志打印了一个 null
LOG.trace(LOG_PREFIX + "user record : {} ", v);
这可能表明连接没有被触发,但它仍然会随着拓扑的执行而继续。因此,在加入之后,它开始执行“selectKey”节点并运行到 NPE。
据我了解,如果未触发联接,则应在拓扑中等待。
请任何人帮助了解可能出现的问题