问题标签 [apache-beam-kafkaio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
576 浏览

google-cloud-dataflow - 消费者组中的 Apache Beam KafkaIO 消费者阅读相同的消息

我在数据流中使用 KafkaIO 来读取来自一个主题的消息。我使用以下代码。

我使用直接运行器在本地运行数据流程序。一切运行良好。我并行运行同一程序的另一个实例,即另一个消费者。现在我在管道处理中看到重复的消息。

虽然我已经提供了消费者组 ID,但使用相同的消费者组 ID(同一程序的不同实例)启动另一个消费者不应该处理另一个消费者处理的相同元素,对吗?

使用数据流运行器结果如何?

0 投票
1 回答
119 浏览

apache-kafka - Apache Beam KafkaIO 生产者将不同的消息路由到不同的主题

我有一个用例,其中传入的数据有一个标识不同类型数据的键。有一个单一的输入 kafka 主题,所有类型的数据都被抛出。光束管道从输入的 kafka 主题读取所有消息,并且必须根据密钥路由到不同的 kafka 主题。

目前,KafkaIO 不支持使用单个生产者写入多个主题。以下代码是内部工作代码KafkaIO.write()

如何使用 apache Beam 的 kafkaIO 生产者来做到这一点?

0 投票
1 回答
1235 浏览

java - 如何使用 KafkaIO 设置 AvroCoder 和使用 Java 设置 Apache Beam

我正在尝试创建一个管道,将数据从 Kafka 主题流式传输到谷歌的 Bigquery。主题中的数据在 Avro 中。

我调用了 apply 函数 3 次。一次从 Kafka 读取,一次提取记录,一次写入 Bigquery。这是代码的主要部分:

运行时,我收到以下错误:

如何设置编码器以正确读取 Avro?

0 投票
1 回答
199 浏览

java - 如何在 Apache Beam 中使用 KafkaIO 指定 kafka 代理

我正在尝试设置 KafkaIO 管道,但我不知道如何指定代理。指定经纪人名称和端口似乎并没有这样做。我从来没有指定我的 kafka 集群在哪里:

这给了我错误:

我的 Kafka 集群在本地运行,并且代理 ID 正确。

我错过了什么?指定经纪人的正确方法是什么?

0 投票
0 回答
205 浏览

apache-beam - 带有 Apache Beam 的 KafkaIO 在 DirectRunner 上陷入无限循环

我正在尝试运行这个简单的示例,其中过滤掉来自 Kafka 主题的数据:https ://www.talend.com/blog/2018/08/07/developing-data-processing-job-using-apache-beam -流式管道/

我与具有默认设置的 localhost 代理进行了类似的设置,但我什至无法阅读该主题。

运行应用程序时,它会陷入无限循环,什么也没有发生。我已经尝试为我的经纪人提供乱码 url,看看它是否能够联系到他们 - 它不是。集群已启动并正在运行,我可以向主题添加消息。这是我指定代理和主题的地方:

我没有看到任何错误,并且没有写入输出主题。

调试时,我看到它卡在这个循环中:

在 ExecutorServiceParallelExecutor 类的 isKeyed(PValue pvalue) 方法中。

我错过了什么?

0 投票
2 回答
537 浏览

google-bigquery - 如何从 Apache Beam KafkaIO 中的 kafka 主题推断 avro 模式

我正在使用 Apache Beam 的 kafkaIO 从 Confluent 模式注册表中具有 avro 模式的主题中读取数据。我能够反序列化消息并写入文件。但最终我想写信给 BigQuery。我的管道无法推断架构。如何提取/推断架构并将其附加到管道中的数据,以便我的下游进程(写入 BigQuery)可以推断架构?

这是我使用模式注册表 url 设置反序列化器以及从 Kafka 读取的代码:

我最初认为这足以让 beam 推断架构,但它并没有因为 hasSchema() 返回 false。

任何帮助,将不胜感激。

0 投票
0 回答
303 浏览

apache-beam - 如何使用 Apache Beam 从 Confluent Schema Registry 推断模式?

我正在尝试创建一个 Apache Beam 管道,我从一个 kafka 主题中读取并将其加载到 Bigquery 中。使用 Confluent 的模式注册表,我应该能够在加载到 Bigquery 时推断模式。但是,加载失败时不会推断架构。

下面是整个管道代码。

运行此程序时,我收到以下错误,这是因为我正在调用 useBeamSchema() 并且 hasSchema() 返回 false:

0 投票
1 回答
239 浏览

apache-beam - Apache Beam KafkaIO 提到主题分区而不是主题名称

Apache Beam KafkaIO 支持 kafka 消费者仅从指定分区读取。我有以下代码。

我有以下两个问题。

  1. 如何从 kafka 获取分区名称?我如何在 kafkaIO 中提及它?
  2. Apache Beam 生成的 kafka 消费者数量是否等于创建 kafka 消费者时提到的分区列表?
0 投票
2 回答
728 浏览

apache-flink - 使用 flink runner 时如何在 apache Beam 中执行检查点?

我正在阅读未绑定的源(Kafka)并将其字数写入其他 Kafka 主题。现在我想在光束管道中执行检查点。我已按照 apache 梁文档中的所有说明进行操作,但即使在此之后也没有创建检查点目录。

以下是我用于管道的参数:-

谁能帮我检查点?

0 投票
1 回答
311 浏览

apache-kafka - 在 apache-beam 中写入多个 Kafka 主题?

我正在执行一个简单的字数统计程序,其中我使用一个 Kafka 主题(生产者)作为输入源,然后我对其应用 pardo 以计算字数。现在我需要帮助根据它们的频率写出不同主题的词。假设所有频率均匀的单词将转到主题 1,其余的将转到主题 2。

谁能帮我举个例子?