问题标签 [apache-beam-kafkaio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
68 浏览

apache-beam - 检查点/容错工作的概念如何在 Apache Beam 中工作?

我正在研究 apache 光束流管道,其中 Kafka 生产者作为输入,消费者作为输出。谁能帮我解决 apache-beam 中的检查点问题

0 投票
1 回答
189 浏览

apache-flink - 通过客户端提交程序或在 TestEnvironment 上下文中运行时,不能使用 RemoteEnvironment

我试图执行以 Kafka 作为输入和输出的 apache-beam 字数。但是在将jar提交到flink集群时,出现了这个错误-

我用来提交jar的命令-

0 投票
1 回答
586 浏览

apache-beam - 消费者组中的 Apache Beam KafkaIO 消费者被分配了唯一的组 ID

我正在使用 DirectRunner 运行多个 Apache Beam KafkaIO 实例,这些实例从同一主题中读取。但是消息正在传递到所有正在运行的实例。在看到我发现的 Kafka 配置后,组名会附加一些唯一的前缀,并且每个实例都有唯一的组名。

  1. group.id = Reader- 0_offset_consumer_559337182_my_group
  2. group.id = Reader- 0_offset_consumer_559337345_my_group

因此,每个实例都分配了唯一的group.id,这就是消息被传递到所有实例的原因。

那么我必须提供什么配置才能使组中的所有消费者都不会阅读相同的消息

0 投票
2 回答
526 浏览

google-cloud-platform - 使用来自 Google Pubsub 的消息并将其发布到 Kafka

我正在尝试使用同步 PULL API 使用 Google PubSub 消息。这在 Apache Beam Google PubSub IO 连接器库中可用。我想使用 KafkaIO 将消费的消息写入 Kafka。我想使用 FlinkRunner 来执行作业,因为我们在 GCP 之外运行这个应用程序。

我面临的问题是消费的消息没有在 GCP PubSub 中得到确认。我已经确认本地 Kafka 实例具有来自 GCP PubSub 的消息。GCP DataFlow 中的文档表明,当管道使用数据接收器(在我的情况下为 Kafka)终止时,数据包已完成。

但是由于代码是在 Apache Flink 而不是 GCP DataFlow 中运行的,我认为某种回调不会被触发与确认提交的消息相关。
我在这里做错了什么?

0 投票
1 回答
141 浏览

timestamp - Estimating Watermark for Event Time in Beam

Im trying to use Beam to aggregate over a set of data using event time from the data and Kafka as data source. This works if all my kafka partitions are populated with data. However as soon as a partition has not yet been written to, the watermark cant be estimated and advanced.My TimeStampPolicy is the following:

With 3 Kafka partitions with only one populated with data my logs show me these watermarks:

With default triggering my windows wont fire. My guess is that the output watermark is the minimum over the watermarks of the partitions. And therefore wont advance as long as some of my partitions are empty. How can i handle empty partitions with event time processing?

0 投票
1 回答
339 浏览

apache-flink - Apache Beam 中的 commitOffsetsInFinalize() 和复选标记

我正在开发一个使用 KafkaIO 作为输入的 Beam 应用程序

我试图了解它的commitOffsetsInFinalize()工作原理。

如何完成流式传输作业?管道中的最后一步是自定义 DoFn,它将消息写入DynamoDb. 有没有办法在那里手动调用一些finalize()方法,以便在每次成功执行后提交偏移量DoFn

我也很难理解检查点和最终确定之间的关系是什么?如果管道上没有启用检查点,我仍然能够完成并开始commitOffsetsInFinalize()工作吗?

ps 管道现在的方式,即使commitOffsetsInFinalize()每条消息都被读取,无论下游是否有故障正在提交,因此导致数据丢失。

谢谢!

0 投票
0 回答
46 浏览

apache-beam-io - 重启后的KafkaIO GroupId

我正在使用 Apache Beam 的 KafkaIO 来读取 Kafka 主题。一切都按预期工作,但如果我的工作被终止并重新启动,新工作会生成一个新的 groupID,因此它最终会从主题的开头读取。

换句话说,如果我的初始工作有group.id = Reader-0_offset_consumer_11111111_my_group作为 groupID,那么下一个工作可能最终有group.id = Reader-0_offset_consumer_22222222_my_group这个。如您所见,在我指定的 my_group之前添加了一些独特的前缀。有什么办法可以避免这种情况并每次都保持相同的组 id?谢谢

0 投票
1 回答
223 浏览

google-cloud-dataflow - KafkaIO 检查点持久性与 Google Dataflow Runner

我试图了解偏移量和组管理如何与带有 KafkaIO 阅读器的 Google Dataflow 运行器一起工作。更具体地说,我试图了解偏移管理的工作原理:

  • 如果设置了 group.id 配置并且禁用了自动提交和 commitOffsetsInFinalize。
  • 如果未设置 group.id 配置,偏移量和组管理如何工作?

任何指向正确方向的代码/文档参考都值得赞赏。

0 投票
1 回答
148 浏览

apache-kafka - Apache Beam、KafkaIO 至少一次语义

我们正在实施一个从 Kafka 读取并写入 BigQuery 的试点。

简单的管道:

  • KafkaIO.read
  • BigQueryIO.write

我们关闭了自动提交。我们正在使用 commitOffsetsInFinalize()

如果 BigQueryIO 端一切正常,此设置能否保证消息在 BigQuery 中至少出现一次并且不会丢失?

在文档中commitOffsetsInFinalize()我遇到了以下内容:  

它有助于在从头开始重新启动管道时最小化记录的间隙或重复处理

我很好奇这里指的是什么“差距”?

如果您考虑边缘情况,是否有可能跳过消息而不将其传递给 BQ?

0 投票
1 回答
442 浏览

apache-kafka - 使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka

我有下面的代码,用于使用python beam sdk连接到 kafka 。我知道ReadFromKafka转换是在 java sdk 线束(docker 容器)中运行的,但我无法弄清楚如何在 sdk 线束的 docker 环境中进行制作ssl.truststore.location和访问。ssl.keystore.location论据job_endpoint指向java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081

我尝试将图像覆盖选项指定为--sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest'-beam_java_sdk:latest我基于apache/beam_java11_sdk:2.27.0的 docker 图像在哪里,它在其 entrypoint.sh 中提取凭据。但是 Beam 似乎没有使用它,我明白了

在日志中。很快不可避免地紧随其后

总之,我的问题是,在 Apache Beam 中,是否可以在 python 梁 sdk 的 java sdk 工具 docker 容器中提供文件?如果是这样,它会如何完成?

非常感谢。