问题标签 [spark-streaming-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
725 浏览

apache-spark - Scala Spark 流式卡夫卡

我在 kafka 中创建了一个示例主题,我正在尝试使用以下脚本在 spark 中使用内容:

我还包含了执行代码所需的库。

我有以下错误,请告诉我如何解决这个问题。

0 投票
0 回答
190 浏览

scala - Apache Spark 流式传输 kafka API 与 kinesis API

我有一个 scala spark 应用程序,我需要根据应用程序配置在 kafka 流和 kinesis 之间切换。

用于 kafka 流 (spark-streaming-kafka-0-10_2.11) 和 kinesis 流 (spark-streaming-kinesis-asl_2.11) 的 spark APIInputDStream在创建流时返回一个,但值类型不同。

Kafka 流创建返回InputDStream[ConsumerRecord[String, String]],而 Kinesis 流创建返回InputDStream[Array[Byte]]

是否有任何API返回泛型InputDStream而不考虑 kafka 或 kinesis,以便我的流处理可以具有通用实现,而不必为 kafka 和 kinesis 编写单独的代码。

我尝试将两个流都分配给 a InputDStream[Any],但这不起作用。

感谢任何关于如何做到这一点的想法。

0 投票
1 回答
555 浏览

java - Spark Streaming Kafka Stream 批量执行

我是火花流的新手,我有一个关于它的使用的一般性问题。我目前正在实现一个从 Kafka 主题流式传输数据的应用程序。

使用应用程序只运行一次批处理是否是一种常见的场景,例如一天结束,从主题中收集所有数据,进行一些聚合和转换等?

这意味着在使用 spark-submit 启动应用程序后,所有这些内容将在一批中执行,然后应用程序将被关闭。还是构建火花流以连续批量运行无休止和永久的流数据?

0 投票
1 回答
433 浏览

apache-spark - 避免在 Spark Streaming 中为空分区写入文件

我有 Spark Streaming 作业,它从 kafka 分区(每个分区一个执行程序)读取数据。
我需要将转换后的值保存到 HDFS,但需要避免创建空文件。
我尝试使用 isEmpty,但是当并非所有分区都为空时,这无济于事。

由于性能下降,PS 重新分区不是可接受的解决方案。

0 投票
1 回答
610 浏览

java - 第一批后关闭 Spark Streaming Context(尝试检索 kafka 偏移量)

我正在尝试为我的 Spark Ba​​tch 作业检索 Kafka 偏移量。检索偏移量后,我想关闭流上下文。

我尝试在流上下文中添加一个流侦听器,并实现 onBatchCompleted 方法以在作业完成后关闭流,但我收到异常"Cannot stop StreamingContext within listener bus thread"

有针对这个的解决方法吗?我正在尝试检索偏移量以调用 KafkaUtils.createRDD(sparkContext, kafkaProperties, OffsetRange[], LocationStrateg)

非常感谢stackoverflow-ers :) 我已经尝试搜索可能的解决方案,但到目前为止还没有成功

编辑:我使用 KafkaConsumer 来获取分区信息。获得分区信息后,我创建一个 TopicPartition pojos 列表并调用 position 和 endOffsets 方法分别获取我的 groupId 的当前位置和结束位置。

0 投票
2 回答
1057 浏览

scala - 使用 scala 从 kafka 主题进行 Spark 流式传输

我是 scala/Spark 开发的新手。我使用 sbt 和 scala 从 Kafka 主题创建了一个简单的流应用程序。我有以下代码

构建.sbt

WeatherDataStream.scala

我已经使用命令创建了 jar 文件

sbt 包

并使用命令运行应用程序

./spark-submit --master spark://myserver:7077 --class com.supergloo.WeatherDataStream /home/Manaf/kafka-streaming_2.11-1.0.jar

但我有这样的错误

根据我的堆栈溢出分析,我了解了使用汇编命令创建 jar

sbt 组装

但是在执行汇编命令时出现如下错误

0 投票
3 回答
4209 浏览

json - Kafka 接收到的 Spark 中的 JSON 到 Dataframe 的数组

我正在使用 Spark Structured Streaming 在 Scala 中编写一个 Spark 应用程序,该应用程序从 Kafka 接收一些以 JSON 样式格式化的数据。此应用程序可以接收以这种方式格式化的单个或多个 JSON 对象:

我试图定义一个 StructType 像:

但它不起作用。我解析 JSON 的实际代码:

我想在像这样的数据框中获取这个 JSON 对象

任何人都可以帮助我吗?谢谢!

0 投票
1 回答
789 浏览

apache-spark - Spark Kafka 在 spark 2.3.0 中使用 python 流式传输

我最近升级到 Spark 2.3.0。我有一个现有的 spark 作业,它曾经在 spark 2.2.0 上运行。我正面临 AbstractMethodError 的 Java 异常

我的简单代码:

这适用于 Spark 2.2.0

使用 Spark spark 2.3.0,我得到以下异常:

我正在使用 spark-streaming-kafka-0-8_2.11-2.3.0.jar带有 --packages 选项的 spark-submit 命令。我尝试spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar与 --package 和 --jars 选项一起使用。

我在这里遵循指南:https ://spark.apache.org/docs/2.3.0/streaming-kafka-0-8-integration.html

spark streaming kafka 0-8 版在 2.3.0 中已弃用,但根据文档,它仍然存在。

我的命令看起来像:

当然,Spark 的 scala 底层代码发生了一些变化。

有没有人遇到过同样的问题?

0 投票
1 回答
348 浏览

scala - 开始流式传输时引发流式传输 kafka 问题

我正在尝试使用 spark2-shell 从 kafka 消费者读取数据。

请在下面找到我的代码。

我以以下方式启动我的 spark2-shell:

请找到我的以下代码:

但是在我开始使用 spark-streaming 之后,这里什么都没有出现。

请建议我绕过此问题的方法。

提前致谢。

0 投票
1 回答
655 浏览

apache-spark - Spark Streaming 指定起始偏移量

我有一个场景,我想使用 Spark DStreams 重新处理来自 Kafka 的特定批次数据。

假设我想重新处理以下批次的数据。

主题分区1-{1000,2000} 主题分区2-{500-600}

下面是我拥有的代码片段,我可以在其中指定起始偏移量。

但是,我想知道他们是否也可以指定结束偏移量,例如结构化流批处理模式。

所以本质上,它应该处理这个小批量并停止工作流程。

注意:我不想在这个用例中使用结构化流。只想使用 DStreams。