0

我正在尝试运行这个简单的示例,其中过滤掉来自 Kafka 主题的数据:https ://www.talend.com/blog/2018/08/07/developing-data-processing-job-using-apache-beam -流式管道/

我与具有默认设置的 localhost 代理进行了类似的设置,但我什至无法阅读该主题。

运行应用程序时,它会陷入无限循环,什么也没有发生。我已经尝试为我的经纪人提供乱码 url,看看它是否能够联系到他们 - 它不是。集群已启动并正在运行,我可以向主题添加消息。这是我指定代理和主题的地方:

        pipeline
            .apply(
                    KafkaIO.<Long, String>read()
                            .withBootstrapServers("localhost:9092")
                            .withTopic("BEAM_IN")
                            .withKeyDeserializer(LongDeserializer.class)
                            .withValueDeserializer(StringDeserializer.class)
                            )

我没有看到任何错误,并且没有写入输出主题。

调试时,我看到它卡在这个循环中:

        while(Instant.now().isBefore(completionTime)) {
        ExecutorServiceParallelExecutor.VisibleExecutorUpdate update = this.visibleUpdates.tryNext(Duration.millis(25L));
        if (update == null && ((State)this.pipelineState.get()).isTerminal()) {
            return (State)this.pipelineState.get();
        }

        if (update != null) {
            if (this.isTerminalStateUpdate(update)) {
                return (State)this.pipelineState.get();
            }

            if (update.thrown.isPresent()) {
                Throwable thrown = (Throwable)update.thrown.get();
                if (thrown instanceof Exception) {
                    throw (Exception)thrown;
                }

                if (thrown instanceof Error) {
                    throw (Error)thrown;
                }

                throw new Exception("Unknown Type of Throwable", thrown);
            }
        }

在 ExecutorServiceParallelExecutor 类的 isKeyed(PValue pvalue) 方法中。

我错过了什么?

4

0 回答 0