我正在尝试运行这个简单的示例,其中过滤掉来自 Kafka 主题的数据:https ://www.talend.com/blog/2018/08/07/developing-data-processing-job-using-apache-beam -流式管道/
我与具有默认设置的 localhost 代理进行了类似的设置,但我什至无法阅读该主题。
运行应用程序时,它会陷入无限循环,什么也没有发生。我已经尝试为我的经纪人提供乱码 url,看看它是否能够联系到他们 - 它不是。集群已启动并正在运行,我可以向主题添加消息。这是我指定代理和主题的地方:
pipeline
.apply(
KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("BEAM_IN")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
)
我没有看到任何错误,并且没有写入输出主题。
调试时,我看到它卡在这个循环中:
while(Instant.now().isBefore(completionTime)) {
ExecutorServiceParallelExecutor.VisibleExecutorUpdate update = this.visibleUpdates.tryNext(Duration.millis(25L));
if (update == null && ((State)this.pipelineState.get()).isTerminal()) {
return (State)this.pipelineState.get();
}
if (update != null) {
if (this.isTerminalStateUpdate(update)) {
return (State)this.pipelineState.get();
}
if (update.thrown.isPresent()) {
Throwable thrown = (Throwable)update.thrown.get();
if (thrown instanceof Exception) {
throw (Exception)thrown;
}
if (thrown instanceof Error) {
throw (Error)thrown;
}
throw new Exception("Unknown Type of Throwable", thrown);
}
}
在 ExecutorServiceParallelExecutor 类的 isKeyed(PValue pvalue) 方法中。
我错过了什么?