问题标签 [spark-kafka-integration]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2742 浏览

apache-spark - Spark Structured Streaming 中的 KafkaSource 的“偏移量已从 X 更改为 0”错误是什么?

我在带有检查点的 Spark Structured Streaming 应用程序中使用 KafkaSource 收到错误“偏移量已从 X 更改为 0,某些数据可能已丢失”,但它似乎实际上并没有引起任何问题。我试图弄清楚错误的实际含义。

我的设置如下。

  • 我在 docker 容器中运行了 Kafka(0.10.1.0),并在 /tmp/kafka-logs 上安装了一个命名卷,以便在重新启动之间保留日志。

  • 我在另一个 docker 容器中有一个 Spark Structured Streaming (2.1.1) 应用程序。流使用来自 Kafka 的数据。他们还在重新安装在命名卷中的位置使用检查点,以确保元数据在重新启动之间保持不变。

  • 我使用实现ForeachWriter接口的自定义接收器,这意味着我必须实现自己的已处理版本日志,以便在一切重新启动时,我可以告诉 Spark Streaming 不要重新处理已处理的内容。

所有这一切都运作良好,数据从 Kafka 正确使用,我的自定义接收器正确处理它。

现在如果我杀死Spark Streaming应用程序,让Kafka中的数据堆积然后重新启动Spark Streaming,它会抛出以下错误,表明Kafka中的某些数据不再可用

但是在抛出错误之后,我看到我的流正常启动。Spark Streaming 正确地将堆积在 Kafka 中的数据推送到我的自定义接收器,并具有预期的版本。然后我的接收器继续并正确处理新数据。

因此,该错误表明某些数据在 Kafka 中不再可用,但仍设法被 Spark Streaming 正确使用。

如果我重新启动 Spark Streaming 应用程序,即使没有数据被推送到 Kafka,我也会再次收到相同的错误。如果我开始将新数据推送到 Kafka,系统将继续正确处理它。

有人知道这里会发生什么吗?我是否错误地解释了错误?

0 投票
1 回答
1069 浏览

scala - Spark Structured Streaming with Kafka - 如何重新分区数据并在工作节点之间分配处理

如果我的 Kafka 主题收到类似的记录

我有 Spark 结构化流代码来读取和处理 Kafka 记录,如下所示:

目前,我使用 foreachwriter 处理记录如下:

代码工作得很好。但是,我想做的是按通道对传入数据进行分区,以便每个工作人员负责特定的通道,并且我在 handle() 块内进行与该通道相关的内存计算。那可能吗 ?如果是,我该怎么做?

0 投票
1 回答
3989 浏览

apache-spark - 如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?

我在这里浏览了 Spark 结构化流式传输 - Kafka 集成指南。

在这个链接上被告知

enable.auto.commit:Kafka 源不提交任何偏移量。

那么,一旦我的 spark 应用程序成功处理了每条记录,我该如何手动提交偏移量呢?

0 投票
2 回答
2136 浏览

apache-spark - Spark Structured Streaming Kafka 错误——偏移量已更改

我的 Spark Structured Streaming 应用程序运行了几个小时,然后出现此错误而失败

当然,偏移量每次都不同,但第一个总是大于第二个。主题数据不能过期,因为主题的保留期是 5 天,我昨天重新创建了这个主题,但今天又出现错误。从中恢复的唯一方法是删除检查点。

Spark 的 Kafka 集成指南failOnDataLoss在选项下提到:

当数据可能丢失(例如,主题被删除或偏移量超出范围)时是否使查询失败。这可能是一个误报。当它不能按预期工作时,您可以禁用它。如果由于丢失数据而无法从提供的偏移量中读取任何数据,批量查询将始终失败。

但是我找不到任何关于何时可以将其视为误报的更多信息,所以我不知道设置是否安全,failOnDataLoss或者false我的集群是否存在实际问题(在这种情况下,我们实际上会失败数据)。

更新:我调查了 Kafka 日志,在 Spark 失败的所有情况下,Kafka 都记录了几条这样的消息(我假设每个 Spark 消费者都有一条消息):

0 投票
4 回答
6935 浏览

apache-spark - How to set group.id for consumer group in kafka data source in Structured Streaming?

I want to use Spark Structured Streaming to read from a secure kafka. This means that I will need to force a specific group.id. However, as is stated in the documentation this is not possible. Still, in the databricks documentation https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl, it says that it is possible. Does this only refer to the azure cluster?

Also, by looking at the documentation of the master branch of the apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md, we can understand that such functionality is intended to be added at later spark releases. Do you know of any plans of such a stable release, that is going to allow setting that consumer group.id?

If not, are there any workarounds for Spark 2.4.0 to be able to set a specific consumer group.id?

0 投票
2 回答
1108 浏览

apache-spark - Spark 结构化流式处理 Kafka 偏移管理

我正在研究将 kafka 偏移量存储在 kafka 内部以用于 Spark Structured Streaming,就像它适用于 DStreamsstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)一样,我正在寻找相同的东西,但适用于 Structured Streaming。是否支持结构化流式传输?如果是,我该如何实现?

我知道使用 的 hdfs 检查点.option("checkpointLocation", checkpointLocation),但我对内置偏移管理完全感兴趣。

我期望 kafka 仅在没有 spark hdfs 检查点的情况下将偏移量存储在内部。

0 投票
2 回答
6360 浏览

scala - 具有 SASL_SSL 身份验证的 Kafka Spark 结构化流

我一直在尝试使用 Spark Structured Streaming API 通过 SASL_SSL 连接到 Kafka 集群。我已将 jaas.conf 文件传递​​给执行者。似乎我无法设置密钥库和信任库身份验证的值。

我尝试传递 thisspark链接中提到的值

另外,尝试通过此链接中的代码传递它

仍然没有运气。

这是日志

代码

火花提交

jaas.conf

任何帮助将不胜感激。

0 投票
1 回答
2420 浏览

apache-spark - Spark 3.x 在 Python 中与 Kafka 的集成

带有 spark-streaming 的 Kafka 会引发错误:

我已经设置了一个 kafka 代理和一个包含一个 master 和一个 worker 的工作 spark 环境。

我假设缺少与 kafka ans 相关的错误,特别是版本。有人能帮忙吗?

火花版本:版本 3.0.0-preview2

我执行:

0 投票
1 回答
457 浏览

apache-spark - PySpark:将 Kafka 主题写入控制台失败

正在从 Kafka 主题获取消息并将其写入控制台。阅读消息不是问题,能够阅读消息并打印架构。但是当我试图将它写入控制台时,它失败了。任何建议都会有所帮助。

下面是我的代码,

下面是错误,

0 投票
1 回答
1878 浏览

scala - Spark Structured Streaming 检查点在生产中的使用

在使用 Spark 结构化流时,我很难理解检查点的工作原理。

我有一个生成一些事件的 spark 进程,我将这些事件登录到 Hive 表中。对于这些事件,我在 kafka 流中收到确认事件。

我创建了一个新的火花过程

  • 将 Hive 日志表中的事件读入 DataFrame
  • 使用 Spark Structured Streaming 将这些事件与确认事件流连接起来
  • 将连接的 DataFrame 写入 HBase 表。

我在 spark-shell 中测试了代码,它工作正常,低于伪代码(我使用的是 Scala)。

现在我想安排这段代码定期运行,例如每 15 分钟一次,我正在努力理解如何在这里使用检查点。

在此代码的每次运行中,我只想从流中读取我在上一次运行中尚未读取的事件,并将这些新事件与我的日志表内部连接,以便仅将新数据写入最终 HBase桌子。

我在 HDFS 中创建了一个目录来存储检查点文件。我将该位置提供给用于调用 spark 代码的 spark-submit 命令。

此时代码每 15 分钟运行一次,没有任何错误,但它基本上没有做任何事情,因为它没有将新事件转储到 HBase 表。检查点目录也是空的,而我假设必须在那里写入一些文件?

是否需要调整 readStream 函数才能从最新的检查点开始读取?

我真的很难理解有关此的 spark 文档。

先感谢您!