问题标签 [apache-beam-kafkaio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
56 浏览

java - Apache Beam Kafka 源连接器空闲分区问题与“CustomTimeStampPolicyWithLimitedDelay”

源是我们的光束管道的 kafka。即使任何分区空闲,Apache beam 的 kafka IO 连接器也支持移动水印(在 flink runner 的情况下)。想要根据包含在有效负载中的数据包的时间戳来处理数据包的应用程序会想要使用“CustomTimestampPolicyWithLimitedDelay”。我们使用 FIXED WINDOWS 一分钟进行聚合,这取决于时间的概念。因此,如果时间没有正确推进,则不会调用聚合函数并且会丢失数据。

此 API 存在功能问题。因此,当应用程序初始化时,让我们以 Topic a 为例,它被用作具有三个分区的源。采取了这些步骤来重现该问题:

  1. 仅以任意 x 秒的频率将数据泵入一个分区,观察结果是聚合函数即使在几分钟后也不会被调用。
  2. 现在将数据泵送到所有分区,观察结果是按预期在每分钟结束时调用聚合函数。
  3. 现在只将数据泵送到一个分区,并且直到在那之前的一分钟结束,这样我们就可以生成一个空闲的分区场景并观察它现在是否按预期工作。

所以总结是这个api有一个初始化问题,它没有提前时间,但在第2步之后它稳定并按预期工作。

这很容易重现,并且会要求 apache beam 来解决这个问题。

到目前为止,我们采用的临时修复方法是使用 LogAppendTime,它可以完美运行,但由于各种应用程序需要,我们不想在代理时间处理数据包。

0 投票
1 回答
213 浏览

python - Apache Beam HTTP 无界源 Python

当前版本的 Apache Beam 是否可以开发一个无界源来接收 HTTP 消息中的数据?我的目的是运行 HTTP 服务器并将收到的消息注入 Beam 管道。如果可能的话,可以用现有的资源来完成吗?

0 投票
0 回答
126 浏览

apache-beam - KafkaIO - 与 groupId 一起使用时 enable.auto.commit 设置为 true 和 commitOffsetsInFinalize 的不同行为

我们有一个 Apache Beam 管道,它从给定的 kafka 主题读取消息并进行进一步处理。我的管道使用 FlinkRunner,我描述了我们尝试过的三种不同情况:

案例 1:未指定组 ID:

Beam 为每次运行创建一个新的消费者,从而从最新的主题偏移中读取。它读取消费者启动后产生的消息。在这种情况下,在管道停止和重新启动之间的时间间隔内可能存在潜在的数据丢失

案例 2:指定组 id 并将 enable.auto.commit 设置为 true Beam 从管道停止时开始重新处理消息,并开始读取给定 groupid 未提交给 kafka 的消息。

新组 id 再次开始监听来自最新主题偏移量的消息并开始提交消息

案例 3:使用 commitOffsetsInFinalize() 指定的组 id

理想情况下,我希望这里的行为与案例 2 相同,但我看到的行为类似于案例 1,在管道停止和重新启动之间存在潜在的数据丢失。

从 KafkaIO 的文档中,我确实看到当检查点按以下方式完成时,偏移量被提交回 kafka:https ://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/ java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1098

我们想了解:

  1. 为什么案例 2 在停止和重新启动管道时的行为不像案例 3?
  2. 什么情况下我们应该设置enable.auto.commit为 true vs commitOffsetsinFinalize
0 投票
1 回答
385 浏览

java - GCP 数据流 Kafka(作为 Azure 事件中心)-> 大查询

TDLR;

我有一个支持 Kafka 的 Azure 事件中心,我试图从 Google Cloud 的 Dataflow 服务连接到它,以将数据流式传输到 Google Big Query。我可以成功地使用 Kafka CLI 与 Azure 事件中心对话。但是,使用 GCP,5 分钟后,我在 GCP 数据流作业窗口中收到超时错误。

启用 Kafka 的 Azure EH -> GCP 数据流 -> GCP 大查询表

细节

要设置启用 Kafka 的事件中心,我按照此 GitHub 页面上的详细信息进行操作。它让开发人员添加一个jaas.confclient_common.properties。其中jaas.conf包括对登录模块的引用以及用户名/密码。带有 Kafka 的事件中心的用户名是$ConnectionString. 密码是从 CLI 复制的连接字符串。client_common.properties包含两个标志:security.protocol=SASL_SSLsasl.mechanism=PLAIN。通过配置这些文件,我可以使用 Kafka CLI 工具和 Azure 事件中心发送和接收数据。我可以通过 Azure 事件中心看到从生产者到消费者的数据流。

我为 Kafka -> Big Query修改了Google 的数据流模板。已经为重置偏移量指定了配置映射。我添加了其他配置以匹配 Azure 事件中心与 Kafka 教程。虽然不是最佳实践,但我将连接字符串添加到密码字段以进行测试。当我将它上传到 GCP 数据流引擎并运行该作业时,我每 5 分钟在日志中收到超时错误,并且在 Google Big Query 中没有任何结果。

作业命令

GCP 数据流中的错误

更新配置

管道

相关问题

0 投票
0 回答
81 浏览

apache-kafka - 如何在 Apache Beam 中使用带有 protobuf 定义的 Kafka 消息?

我在 DataFlow 上运行的 Apache Beam 管道中使用 KafkaIO 无界源。以下配置对我有用

现在,由于我对主题中的消息有一个 protobuf 定义,我想用它来转换 Java 对象中的 kafka 记录。

以下配置不起作用,需要编码器:

不幸的是,我找不到正确的 Value Deserializer + Coder 组合,也无法在文档中找到类似的示例。你有在 Apache Beam 中使用 Protobuf 和 Kafka 源代码的工作示例吗?

0 投票
1 回答
85 浏览

apache-beam - 当一个主题有多个主题时,如何处理来自 Kafka(使用 Apache Beam)的 Avro 输入?

为了使用 Apache Beam 处理 Avro 编码的消息KafkaIO,需要传递一个 的实例ConfluentSchemaRegistryDeserializerProvider作为值反序列化器。

一个典型的例子如下所示:

但是,我想使用的一些 Kafka 主题有多个不同的主题(事件类型)(出于排序原因)。因此,我无法提前提供一个固定的主题名称。如何解决这个困境?

(我的目标是最终BigQueryIO将这些事件推送到云端。)

0 投票
1 回答
69 浏览

python - 基于 Apache Beam KafkaIO python SDK 处理 Avro 消息

我目前正在尝试将消息读取为类似于以下示例KafkaIO 的ByteDeserializer。我的测试设置如下:

选项 1:配置为使用 --runner=PortableRunner

方案二:启动本地的 flink 作业服务器,

发布测试 kafka avro 消息

管道参数定义为,

管道设置,

当我执行管道时,使用默认的扩展服务 SDK 映像(apache/beam_python3.7_sdk:2.29.0)并将作业提交到 flink 作业服务器。flink 作业服务器失败,并显示消息“无法提交 JobGraph”和“Rest 端点关闭”。

我会错过管道的任何运行时配置吗?

0 投票
1 回答
230 浏览

python - Beam Kafka 流输入,无输出打印或文本

我正在尝试通过使用直接运行器来计算 kafka 消息密钥。

如果我在 ReadFromKafka 中输入 max_num_records =20,我可以看到打印或输出为文本的结果。喜欢:

但是如果没有 max_num_records,或者如果 max_num_records 大于 kafka 主题中的消息数,程序会继续运行但没有输出。如果我尝试使用 beam.io.WriteToText 进行输出,则会创建一个空的临时文件夹,例如:beam-temp-StatOut-d16768eadec511eb8bd897b012f36e97

终端显示:

如果我将 'enable.auto.commit': 'true' 放入 kafka 消费者配置中,则消息已提交,同一组中的其他客户端无法读取它们,因此我认为它正在成功读取,只是没有处理或输出。

我尝试了固定时间,滑动时间窗口,有或没有不同的触发器,没有任何变化。

尝试了 flink runner,得到了与直接 runner 相同的结果。

不知道我做错了什么,有帮助吗?

环境:centos 7

蟒蛇

蟒蛇 3.8.8

java 1.8.0_292

梁 2.30

代码如下:

0 投票
1 回答
206 浏览

python - GRPC 错误 Docker Mac - 使用 Python、Beam 和 Flink 进行 Kafka 流处理

更新:我启动了一个 EC2 实例,并且能够让下面的示例正常工作,这证实了这是 Mac 上 Docker 的连接问题。

更新:即使我关闭了 Flink 服务器容器和 Kafka,我仍然会遇到这个错误,这导致我相信这是一个连接问题

我最近尝试使用教程tutorial使用 Python、Apache Beam 和 Apache Flink 处理 Kafka Stream 。根据教程,我使用以下命令设置 Flink

这样做会导致以下结果:

使用python main.py(如下所示)运行我的脚本时,出现以下错误:

有谁知道这个的快速解决方法?我应该注意我发现了这个

主文件

0 投票
0 回答
67 浏览

apache-kafka - Apache Beam:从存储数据的多个主题中读取

我需要从多个 Kafka 主题中读取按时间计算的起始偏移量,按时间戳对它们进行排序并发送到另一个 kafka 主题。所有 kafka 主题都有 1 个分区。

再举一个例子来更好地描述用例。

我们有inputTopic1, inputTopic2outputTopic。我们需要使用inputTopics过去 12 小时的数据并继续使用实时数据。所有消耗的数据都需要排序并发布到outputTopic.

我试图创建自定义windowFn来处理这个用例,但是我在output主题中得到了无序的数据。

我有 2 个问题。

我是否选择了正确的方法来解决这个问题?我是否以正确的方式实施它?

管道

自定义窗口Fn

列表组合器

我也阅读了及时有状态的处理文章,但我不确定它如何用于以下任务。