问题标签 [spark-kafka-integration]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
33 浏览

apache-spark - 如何使用 Spark 将多个 Kafka 主题读取到单独的数据帧中

我正在将数据发布到 2 个名为“akr”和“akr2”的 afka 主题。如何在单独的数据框中读取它们?

0 投票
1 回答
1933 浏览

scala - 如何在 spark 3.0 结构化流中使用 kafka.group.id 和检查点继续从重启后停止的 Kafka 读取?

基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。

如何为火花结构化流指定kafka消费者的组ID?

如何通过 Spark Structured Streaming 确保 kafka 数据摄取不丢失数据?

但是,我尝试了 spark 3.0 中的设置,如下所示。

使用以下 spark-submit 命令在独立模式下测试 spark 作业,但是当我在 AWS EMR 中以集群模式部署时存在同样的问题。

然后,我开始流式作业以从 Kafka 主题中读取流式数据。一段时间后,我杀死了火花工作。然后,我等待 1 小时重新开始工作。如果我理解正确,新的流数据应该从我终止 spark 作业时的偏移量开始。但是,它仍然以最新的偏移量开始,这在我停止作业期间导致数据丢失。

我是否需要配置更多选项以避免数据丢失?还是我对 Spark 3.0 有什么误解?谢谢!

问题解决了

这里的关键问题是检查点必须专门添加到查询中。仅仅为 SparkContext 添加检查点是不够的。添加检查点后,它正在工作。在checkpoint文件夹中,会创建一个offset子文件夹,里面包含offset文件,0,1,2,3....对于每个文件,都会显示不同分区的offset信息。

一种建议是将检查点放在一些外部存储上,例如 s3。即使您需要重建 EMR 集群本身以防万一,它也可以帮助恢复偏移量。

0 投票
1 回答
434 浏览

apache-spark - Apache Spark 与 Kafka 的集成

我正在学习关于 Kafka 和 Spark 的 Udemy 课程,并且正在学习 Apache Spark 与 Kafka 的集成

下面是apache spark的代码

以下是 pom.xml 文件的内容

但是,当我运行代码时,我遇到了无法解决的错误。我在 MX Linux 上使用 openjdk 8 和 spark 3。谢谢

0 投票
1 回答
851 浏览

apache-spark - Kafka 中的 Spark 偏移管理

我正在使用 Spark Structured Streaming(版本 2.3.2)。我需要从 Kafka Cluster 读取并写入 Kerberized Kafka。这里我想在记录写入 Kerberized Kafka 后使用 Kafka 作为偏移检查点。

问题:

  1. 我们可以使用 Kafka 进行检查点来管理偏移量还是只需要使用 HDFS/S3?

请帮忙。

0 投票
2 回答
1230 浏览

apache-spark - Spark 3结构化流在Kafka源中使用maxOffsetsPerTrigger和Trigger.Once

我们需要在结构化流maxOffsetsPerTrigger中使用 Kafka 源,Trigger.Once()但基于这个问题,它似乎allAvailable在 spark 3 中读取。在这种情况下,有没有办法实现速率限制?

这是 spark 3 中的示例代码:

0 投票
1 回答
321 浏览

apache-spark - 如何使用 subscribePattern 订阅新主题?

我正在使用带有 Kafka 的 Spark Structured 流,并且主题被订阅为模式:

option("subscribePattern", "topic.*")

一旦我开始工作并列出了一个新主题topic.new_topic,该工作就不会自动开始收听新主题,它需要重新启动。

有没有办法在不重新启动作业的情况下自动订阅新模式?

火花:3.0.0

0 投票
1 回答
286 浏览

scala - Spark Streaming 和 Kafka 集成中的并行任务数

我对 Spark Streaming 很陌生。我有一些基本的疑问。有人可以帮我澄清一下吗:

  1. 我的消息大小是标准的。每条消息 1Kb。

  2. 主题分区数为 30,并使用 dstream 方法从 kafka 消费消息。

  3. 为 spark 作业分配的核心数为:

    (spark.max.cores=6|spark.executor.cores=2)

  4. 据我了解,Kafka 分区数 = RDD 分区数:

    }

  5. 另外由于我给了 6 个核心,从 kafka 将并行消耗多少个分区

    问题:一次是6个分区还是一次
    30/6 = 5个分区?有人可以详细说明这在 dstream 方法中是如何工作的。

0 投票
0 回答
55 浏览

scala - 使用 Spark/Kafka 流处理应用程序进行日志记录

我是使用 Spark 和 Kafka 集成在 Scala 中工作的新手。但是,我遇到了记录问题。我尝试了许多不同的日志库,但它们都从 Spark 返回相同的错误。

错误如下:Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

我的代码如下:

我想知道是否有办法打印或记录结果。问题是该writeStream.start()函数仅适用于数据帧,我无法让它打印字符串。任何帮助将非常感激。

0 投票
1 回答
60 浏览

apache-spark-sql - 如何通过 Spark 从 Kafka 获取至少 N 条日志?

在 Spark 流式传输中,我会在它们到达时获取日志。但我想一次获得至少 N 个日志。如何实现?

这个答案来看,Kafka 中似乎有这样的实用程序,但Spark 中似乎没有这样的实用程序来实现它。

0 投票
1 回答
346 浏览

apache-spark - kafka-consumer-groups 命令不显示火花结构化流应用程序(消费者)的 LAG 和 CURRENT-OFFSET

我有一个从 kafka 消费的 spark 结构化流应用程序,对于这个应用程序,我想监控消费者滞后。我正在使用以下命令来检查消费者滞后。但是我没有得到 CURRENT-OFFSET ,因此 LAG 也是空白的。这是预期的吗?它适用于其他基于 python 的消费者。

命令

输出