问题标签 [spark-kafka-integration]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - 如何使用 Spark 将多个 Kafka 主题读取到单独的数据帧中
我正在将数据发布到 2 个名为“akr”和“akr2”的 afka 主题。如何在单独的数据框中读取它们?
scala - 如何在 spark 3.0 结构化流中使用 kafka.group.id 和检查点继续从重启后停止的 Kafka 读取?
基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。
如何通过 Spark Structured Streaming 确保 kafka 数据摄取不丢失数据?
但是,我尝试了 spark 3.0 中的设置,如下所示。
使用以下 spark-submit 命令在独立模式下测试 spark 作业,但是当我在 AWS EMR 中以集群模式部署时存在同样的问题。
然后,我开始流式作业以从 Kafka 主题中读取流式数据。一段时间后,我杀死了火花工作。然后,我等待 1 小时重新开始工作。如果我理解正确,新的流数据应该从我终止 spark 作业时的偏移量开始。但是,它仍然以最新的偏移量开始,这在我停止作业期间导致数据丢失。
我是否需要配置更多选项以避免数据丢失?还是我对 Spark 3.0 有什么误解?谢谢!
问题解决了
这里的关键问题是检查点必须专门添加到查询中。仅仅为 SparkContext 添加检查点是不够的。添加检查点后,它正在工作。在checkpoint文件夹中,会创建一个offset子文件夹,里面包含offset文件,0,1,2,3....对于每个文件,都会显示不同分区的offset信息。
一种建议是将检查点放在一些外部存储上,例如 s3。即使您需要重建 EMR 集群本身以防万一,它也可以帮助恢复偏移量。
apache-spark - Apache Spark 与 Kafka 的集成
我正在学习关于 Kafka 和 Spark 的 Udemy 课程,并且正在学习 Apache Spark 与 Kafka 的集成
下面是apache spark的代码
以下是 pom.xml 文件的内容
但是,当我运行代码时,我遇到了无法解决的错误。我在 MX Linux 上使用 openjdk 8 和 spark 3。谢谢
apache-spark - Kafka 中的 Spark 偏移管理
我正在使用 Spark Structured Streaming(版本 2.3.2)。我需要从 Kafka Cluster 读取并写入 Kerberized Kafka。这里我想在记录写入 Kerberized Kafka 后使用 Kafka 作为偏移检查点。
问题:
- 我们可以使用 Kafka 进行检查点来管理偏移量还是只需要使用 HDFS/S3?
请帮忙。
apache-spark - Spark 3结构化流在Kafka源中使用maxOffsetsPerTrigger和Trigger.Once
我们需要在结构化流maxOffsetsPerTrigger
中使用 Kafka 源,Trigger.Once()
但基于这个问题,它似乎allAvailable
在 spark 3 中读取。在这种情况下,有没有办法实现速率限制?
这是 spark 3 中的示例代码:
apache-spark - 如何使用 subscribePattern 订阅新主题?
我正在使用带有 Kafka 的 Spark Structured 流,并且主题被订阅为模式:
option("subscribePattern", "topic.*")
一旦我开始工作并列出了一个新主题topic.new_topic
,该工作就不会自动开始收听新主题,它需要重新启动。
有没有办法在不重新启动作业的情况下自动订阅新模式?
火花:3.0.0
scala - Spark Streaming 和 Kafka 集成中的并行任务数
我对 Spark Streaming 很陌生。我有一些基本的疑问。有人可以帮我澄清一下吗:
我的消息大小是标准的。每条消息 1Kb。
主题分区数为 30,并使用 dstream 方法从 kafka 消费消息。
为 spark 作业分配的核心数为:
(spark.max.cores=6|spark.executor.cores=2)
据我了解,Kafka 分区数 = RDD 分区数:
}
另外由于我给了 6 个核心,从 kafka 将并行消耗多少个分区
问题:一次是6个分区还是一次
30/6 = 5个分区?有人可以详细说明这在 dstream 方法中是如何工作的。
scala - 使用 Spark/Kafka 流处理应用程序进行日志记录
我是使用 Spark 和 Kafka 集成在 Scala 中工作的新手。但是,我遇到了记录问题。我尝试了许多不同的日志库,但它们都从 Spark 返回相同的错误。
错误如下:Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
我的代码如下:
我想知道是否有办法打印或记录结果。问题是该writeStream.start()
函数仅适用于数据帧,我无法让它打印字符串。任何帮助将非常感激。
apache-spark-sql - 如何通过 Spark 从 Kafka 获取至少 N 条日志?
在 Spark 流式传输中,我会在它们到达时获取日志。但我想一次获得至少 N 个日志。如何实现?
从这个答案来看,Kafka 中似乎有这样的实用程序,但Spark 中似乎没有这样的实用程序来实现它。
apache-spark - kafka-consumer-groups 命令不显示火花结构化流应用程序(消费者)的 LAG 和 CURRENT-OFFSET
我有一个从 kafka 消费的 spark 结构化流应用程序,对于这个应用程序,我想监控消费者滞后。我正在使用以下命令来检查消费者滞后。但是我没有得到 CURRENT-OFFSET ,因此 LAG 也是空白的。这是预期的吗?它适用于其他基于 python 的消费者。
命令
输出