问题标签 [apache-beam-kafkaio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
119 浏览

java - KeyError: 'beam:coders:javasdk:0.1' 与 KafkaIO 和 Python 外部转换

我正在尝试设置一个 Apache Beam Java 管道:

  • 从 Kafka 读取消息
  • 调用外部 Python 转换
  • 将输出写入 Kafka

在此之前,我尝试了没有 Kafka 的简单管道:例如,使用“创建”在 Java 中生成一些测试值,然后将它们传递给虚拟 Python 转换。到目前为止有效。

这是管道代码的摘录:

这是外部转换包装器:

这是我的 Python 转换的摘录:

我这样设置 Python 的扩展服务:

Flink 的作业服务器:

当我像这样运行 Java 管道时:

我得到以下堆栈跟踪:

我不知道为什么这不起作用,而其他示例则起作用。
类型似乎在KafkaIO.readPythonWordCount expand方法之间匹配。

0 投票
0 回答
103 浏览

apache-spark - 如何优雅地关闭 Apache Beam 管道

我有一个在 Spark 上运行的 Apache Beam 管道。管道从 Kafka 主题 (KafkaIO) 读取并写入文件 (FileIO)。我经常需要停止管道并更新一些外部参数。这会导致一些数据丢失。有没有办法优雅地关闭管道?各种排水,以避免数据丢失。

我在 github ( https://github.com/apache/beam/pull/12287/files ) 中看到了这个 MR,但我似乎无法理解如何使用它。任何帮助,将不胜感激

0 投票
1 回答
42 浏览

apache-beam - 如何在 Apache BEAM 中使用 withDynamicRead 和 KafkaIO

我在 Apache Beam 中使用 KafkaIO 读取,我正在尝试调用withDynamicRead。我还有一个基本的 withCheckStopReadingFn 调用:

我收到了这个错误,我无法理解它。有人知道如何正确调用 DynamicRead 吗?我正在使用 Apache Beam 2.29 版

0 投票
0 回答
111 浏览

apache-beam - 根据偏移量或偏移量时间戳停止从 Kafka 消费的 Beam 管道

似乎 Apache Beam (2.29.0) Kafka 管道适用于流式传输,只有在尝试以批处理模式消费时可用的钩子很少。我正在尝试读取在具有开始和结束时间戳的时间范围内给出的丢失消息。很容易弄清楚如何从给定的开始时间开始。为此,我使用了 withStartReadTime 方法,如下所示:

上面的代码使用给定的时间戳调用生产者的 seek 方法,消费者将从等于或大于该起始时间戳的偏移量开始读取分区。

但是,要停止管道,有 3 种可能的方法: withMaxNumRecords 和 withMaxReadTime 和 withCheckStopReadingFn withMaxReadTime 使用持续时间,并且会在给定的时间内读取记录,因此它会停止,因此对于我的目的来说不是确定性的。withMaxNumRecords 我可以计算每个分区的记录数量,我想读取开始时间和结束时间的偏移量,并计算每个分区的记录增量并将它们全部加起来。但是,这也将是不确定的,因为在读取时无法保证消息已在所有分区中均匀读取。一个分区可能已通过目标偏移量读取,而另一个分区未达到目标偏移量。CheckStopReadingFn 的最后一个选项是一个不错的选择。我们目前使用它来暂停管道,但不是基于偏移量。问题在于传递给函数的参数。这是该函数的示例实现。

此方法不采用正在处理的当前偏移量或最后处理的偏移量。因此,当达到给定的偏移量/时间戳时,我不能使用它来停止从分区读取。如果给出这两个偏移量/时间戳,那么实现停止将非常容易。我显然将 KafkaUnboundedReader 用于管道。我看到了对 BoundedReader 的引用,但是我不知道 Kafka 是否支持它以及在我的情况下如何使用它。

我对 Beam 中缺乏对批处理模式的支持感到困惑。我来自 Spark 世界,这类问题会有无数的解决方案。我不知道我是否缺少其他一些 API,或者是否有比 2.29 更高的版本以及我正在寻找的选项。如果有人指出我这样的解决方案,我将不胜感激。

0 投票
1 回答
40 浏览

java - KafkaIO with BootStrapServers

我试图在使用 ValueProvider 执行运行命令时获取服务器 ID 作为参数

对于选项接口中的值提供者:

withBootstrapServers 抛出错误“不兼容的类型:org.apache.beam.sdk.options.ValueProvider 无法转换为 java.lang.String”

这个答案建议使用 options.getBootstrapServers().get() 但这会产生以下错误

非常感谢任何解决此问题的帮助

0 投票
1 回答
65 浏览

python-3.x - GroupByKey() 与 Apache Beam

我正在尝试使用 apache Beam 将消息从 kafka 消费者流式传输到 30 秒窗口。使用 beam_nuggets.io 读取 kafka 主题。

你可以在下面看到我的代码:

GroupByKey 仍然不产生任何输出。

我的消费信息:

GroupByKey() 可以做到这一点,因为我所有消息的密钥都是“无”,如果我错了,请帮忙。谢谢

0 投票
1 回答
37 浏览

apache-beam - 如何在从 Kafka 源读取的 Apache Beam 中模拟事件延迟

我正在尝试在流式 Beam 管道中调整窗口参数。我正在修改的参数是 withAllowedLateness、触发器、间隔、窗格触发等。但是我不知道如何在我的 Kafka 消费管道中触发延迟以测试更改。有人可以建议如何创建事件延迟吗?

谢谢

0 投票
1 回答
35 浏览

apache-spark - 使用 Kafka IO 时 Spark Runner 的 Apache Beam 问题

我正在尝试使用 Spark Runner 测试 KafkaIO 的 Apache Beam 代码。该代码适用于 Direct Runner。

但是,如果我在下面添加代码行,则会引发错误:

错误:

我尝试使用的版本:

0 投票
1 回答
26 浏览

java - 如何在 Apache Beam 中读取 Kafka 记录摄取时间戳

我是 Apache Beam 的新手,在这个问题上挣扎了一段时间。我在 Apache Beam Java 中使用 KafkaIO 作为我的管道源。我想获取 Kafka 记录摄取时间戳以及每条记录,并将其作为附加列写入我的输出。记录在 Kafka 中被摄取的时间戳,而不是事件时间。

如果不使用 withoutMetadata() 函数,我无法弄清楚如何使用 kafkaIOReader 。据我了解,Kafka 记录摄取时间戳应该是每条记录的元数据的一部分?

0 投票
0 回答
20 浏览

apache-flink - 从 Kinesis 流读取时 Apache 束错误的事件时间

我正在尝试构建一个实时管道来处理来自 Kinesis 流的 JSON 事件并将它们聚合到固定窗口上 - 例如。每 5 分钟计算一次平均值。记录如下所示:

我在 Flink Runner 上使用 Apache Beam 并创建了一个管道,该管道将根据事件时间处理事件,如下所示:

管道的最后一步应该是一个窗口函数,但为简单起见将其省略。代码原样引发异常:

通过查看 KinesisIO 代码,没有设置时间戳功能的功能,但记录的进入时间用作时间戳。与 KafkaIO 或 AvroIo 相比,您可以覆盖从记录中提取事件时间的方式,而 KinesisIo 则没有这样的事情。

有谁知道这个问题的任何解决方法?

谢谢


KafkaIO - Apache Beam 的相关问题:使用 Withtimestamp 分配事件时间时出错