问题标签 [spark-streaming-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
85 浏览

maven - 提交 Spark 作业时,如何优先考虑 Maven 依赖项而不是 Spark 类路径?

我有 Hadoop、Spark 等的 Cloudera 发行版,其中 Spark-Kafka 版本为 0.8(即 spark-streaming-kafka-0-8_2.11)。

问题是,带有 Kafka 集成的 Apache Spark 0.8 版内置了 Kafka 0.8.2.1 版,我需要 0.10.0.1。

有没有办法解决这个问题?我不想使用 spark-streaming-kafka-0-10_2.11 因为它不是一个稳定的版本。

我尝试将它添加到我的 Maven 依赖项(使用 jar 打包),但类路径优先于我的 Maven 依赖项。

0 投票
0 回答
642 浏览

apache-spark - 火花流放慢

在我们的 spark 应用程序中,我们使用 Kafka 流并将数据存储到 Cassandra DB。

首先,我们在没有背压的情况下运行了流,并且遇到了一个奇怪的异常,其中处理时间是恒定的 ~ 1 分钟,但是调度延迟正在增加。以这种方式,队列堆积如山,最终使流崩溃。

任何想法为什么会发生这种情况?如果不是处理,什么会导致如此严重的延迟? 在此处输入图像描述

然后我们尝试使用背压(增加maxRatePerPartition)进行相同的设置,最初,一切运行良好。背压完成了它的节流工作,我们能够以大约100K/分钟的恒定速率进行处理。

然后几个小时后,发生了一些事情,速率迅速下降到5K / 分钟。处理时间只有 5-6 秒,没有调度延迟,但背压荒谬地将速率保持在 5k/分钟,并且从未增加。实际上,根本没有理由将速度降低到 5K。

我们的设置:

具有 1 个主节点和 2 个工作节点的 Spark 集群

0 投票
1 回答
647 浏览

python - 在 python 中的 kafka Direct Stream 中手动提交偏移量

我正在将一个用 scala 编写的流应用程序移植到 python。我想手动提交 DStream 的偏移量。这是在 scala 中完成的,如下所示:

但我无法在 python 中找到类似的 API。您能否指导我如何使用 python 客户端手动提交偏移量。

0 投票
1 回答
515 浏览

apache-spark - Spark连续处理模式不读取所有kafka主题分区

我正在结构化流中试验 Spark 的连续处理模式,并且我正在读取具有 2 个分区的 Kafka 主题,而 Spark 应用程序只有一个执行器和一个核心。

该应用程序是一个简单的应用程序,它只是从第一个主题中读取并在第二个主题上发布。问题是我的控制台消费者从第二个主题读取它只看到来自第一个主题的一个分区的消息。这意味着我的 Spark 应用程序只读取来自主题的一个分区的消息。

如何让我的 Spark 应用程序从主题的两个分区中读取?

笔记

我正在为可能遇到与我相同问题的人问这个问题

0 投票
1 回答
238 浏览

apache-spark - 使用 KafkaUtils.createDirectStream 在 Kafka 中存储消息偏移量

如果我使用 KafkaUtils.createDirectStream 读取消息,如何在 Kafka 中存储消息偏移量。每次应用程序关闭时,Kafka 都会丢失偏移值。然后它正在读取 auto.offset.reset 中提供的值(这是最新的)并且无法在应用程序的停止-启动间隔内读取消息。

0 投票
1 回答
32 浏览

spark-structured-streaming - 如何在 Spark 结构化流中获取特定日期的聚合数据

我有一个 spark 结构化的蒸汽作业,它从 kafka 读取流并将输出写入 HDFS。我的问题是我需要一整天的汇总结果,直到特定时间。由于火花结构化流不支持完整/更新模式,有没有办法实现相同?

如果我在上午 10 点获得数据,我需要一个汇总结果,直到当前日期上午 10 点...

有人可以帮助如何实现相同的目标吗?

0 投票
3 回答
2673 浏览

scala - 如何在 Spark 结构化流中包含 kafka 时间戳值作为列?

我正在寻找将 kafka 的时间戳值添加到我的 Spark 结构化流模式的解决方案。我已经从 kafka 中提取了 value 字段并制作了数据框。我的问题是,我还需要获取时间戳字段(来自 kafka)以及其他列。

这是我当前的代码:

如何从 kafka 获取时间戳并与其他列一起添加为列?

0 投票
0 回答
278 浏览

apache-spark - Spark sql 在 Json 数据上使用 Kafka 流式传输:函数 from_json 无法解析来自 kafka 主题的多行 json

在这里,我将 json 数据从“test”主题发送到 kafka,将模式提供给 json,进行一些转换并在控制台上打印。这是代码: -

我通过两种方式向kafka发送数据:-

1)单行json:-

2)多行json:-

来自源的 Json 就像第二个。

从kafka读取流数据时如何处理json?我认为问题可能是 from_json 函数不理解多行 json。

0 投票
1 回答
589 浏览

apache-spark - 在 Spark Structured Streaming 中逐行拆分 Kafka 消息行

我想将 Spark Structured Streaming 作业中来自 Kafka 主题的消息读入数据帧。但是我在一个偏移量中获取整个消息,因此在数据帧中只有这条消息进入一行而不是多行。(在我的情况下是 3 行)

当我打印此消息时,我得到以下输出:

在此处输入图像描述

我想要在数据框中的 3 行中显示消息“Text1”、“Text2”和“Text3”,以便我可以进一步处理。

请帮我。

0 投票
1 回答
828 浏览

pyspark - 如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框

我正在阅读来自 kafka 主题的消息

当我从上述查询打印数据框时,我得到以下控制台输出。

如何从 DataStreamReader 创建一个数据框,以便我有一个包含列的数据框|key|channel| username| message|

我尝试按照 如何使用结构化流从 Kafka 读取 JSON 格式的记录中接受的答案?

但是,我进去Expected type 'StructField', got 'StructType' insteadfrom_json()