问题标签 [spark-streaming-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - Scala Spark 流式卡夫卡
我在 kafka 中创建了一个示例主题,我正在尝试使用以下脚本在 spark 中使用内容:
我还包含了执行代码所需的库。
我有以下错误,请告诉我如何解决这个问题。
scala - Apache Spark 流式传输 kafka API 与 kinesis API
我有一个 scala spark 应用程序,我需要根据应用程序配置在 kafka 流和 kinesis 之间切换。
用于 kafka 流 (spark-streaming-kafka-0-10_2.11) 和 kinesis 流 (spark-streaming-kinesis-asl_2.11) 的 spark APIInputDStream
在创建流时返回一个,但值类型不同。
Kafka 流创建返回InputDStream[ConsumerRecord[String, String]]
,而 Kinesis 流创建返回InputDStream[Array[Byte]]
是否有任何API
返回泛型InputDStream
而不考虑 kafka 或 kinesis,以便我的流处理可以具有通用实现,而不必为 kafka 和 kinesis 编写单独的代码。
我尝试将两个流都分配给 a InputDStream[Any]
,但这不起作用。
感谢任何关于如何做到这一点的想法。
java - Spark Streaming Kafka Stream 批量执行
我是火花流的新手,我有一个关于它的使用的一般性问题。我目前正在实现一个从 Kafka 主题流式传输数据的应用程序。
使用应用程序只运行一次批处理是否是一种常见的场景,例如一天结束,从主题中收集所有数据,进行一些聚合和转换等?
这意味着在使用 spark-submit 启动应用程序后,所有这些内容将在一批中执行,然后应用程序将被关闭。还是构建火花流以连续批量运行无休止和永久的流数据?
apache-spark - 避免在 Spark Streaming 中为空分区写入文件
我有 Spark Streaming 作业,它从 kafka 分区(每个分区一个执行程序)读取数据。
我需要将转换后的值保存到 HDFS,但需要避免创建空文件。
我尝试使用 isEmpty,但是当并非所有分区都为空时,这无济于事。
由于性能下降,PS 重新分区不是可接受的解决方案。
java - 第一批后关闭 Spark Streaming Context(尝试检索 kafka 偏移量)
我正在尝试为我的 Spark Batch 作业检索 Kafka 偏移量。检索偏移量后,我想关闭流上下文。
我尝试在流上下文中添加一个流侦听器,并实现 onBatchCompleted 方法以在作业完成后关闭流,但我收到异常"Cannot stop StreamingContext within listener bus thread"。
有针对这个的解决方法吗?我正在尝试检索偏移量以调用 KafkaUtils.createRDD(sparkContext, kafkaProperties, OffsetRange[], LocationStrateg)
非常感谢stackoverflow-ers :) 我已经尝试搜索可能的解决方案,但到目前为止还没有成功
编辑:我使用 KafkaConsumer 来获取分区信息。获得分区信息后,我创建一个 TopicPartition pojos 列表并调用 position 和 endOffsets 方法分别获取我的 groupId 的当前位置和结束位置。
scala - 使用 scala 从 kafka 主题进行 Spark 流式传输
我是 scala/Spark 开发的新手。我使用 sbt 和 scala 从 Kafka 主题创建了一个简单的流应用程序。我有以下代码
构建.sbt
WeatherDataStream.scala
我已经使用命令创建了 jar 文件
sbt 包
并使用命令运行应用程序
./spark-submit --master spark://myserver:7077 --class com.supergloo.WeatherDataStream /home/Manaf/kafka-streaming_2.11-1.0.jar
但我有这样的错误
根据我的堆栈溢出分析,我了解了使用汇编命令创建 jar
sbt 组装
但是在执行汇编命令时出现如下错误
json - Kafka 接收到的 Spark 中的 JSON 到 Dataframe 的数组
我正在使用 Spark Structured Streaming 在 Scala 中编写一个 Spark 应用程序,该应用程序从 Kafka 接收一些以 JSON 样式格式化的数据。此应用程序可以接收以这种方式格式化的单个或多个 JSON 对象:
我试图定义一个 StructType 像:
但它不起作用。我解析 JSON 的实际代码:
我想在像这样的数据框中获取这个 JSON 对象
任何人都可以帮助我吗?谢谢!
apache-spark - Spark Kafka 在 spark 2.3.0 中使用 python 流式传输
我最近升级到 Spark 2.3.0。我有一个现有的 spark 作业,它曾经在 spark 2.2.0 上运行。我正面临 AbstractMethodError 的 Java 异常
我的简单代码:
这适用于 Spark 2.2.0
使用 Spark spark 2.3.0,我得到以下异常:
我正在使用 spark-streaming-kafka-0-8_2.11-2.3.0.jar
带有 --packages 选项的 spark-submit 命令。我尝试spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar
与 --package 和 --jars 选项一起使用。
我在这里遵循指南:https ://spark.apache.org/docs/2.3.0/streaming-kafka-0-8-integration.html
spark streaming kafka 0-8 版在 2.3.0 中已弃用,但根据文档,它仍然存在。
我的命令看起来像:
当然,Spark 的 scala 底层代码发生了一些变化。
有没有人遇到过同样的问题?
scala - 开始流式传输时引发流式传输 kafka 问题
我正在尝试使用 spark2-shell 从 kafka 消费者读取数据。
请在下面找到我的代码。
我以以下方式启动我的 spark2-shell:
请找到我的以下代码:
但是在我开始使用 spark-streaming 之后,这里什么都没有出现。
请建议我绕过此问题的方法。
提前致谢。
apache-spark - Spark Streaming 指定起始偏移量
我有一个场景,我想使用 Spark DStreams 重新处理来自 Kafka 的特定批次数据。
假设我想重新处理以下批次的数据。
主题分区1-{1000,2000} 主题分区2-{500-600}
下面是我拥有的代码片段,我可以在其中指定起始偏移量。
但是,我想知道他们是否也可以指定结束偏移量,例如结构化流批处理模式。
所以本质上,它应该处理这个小批量并停止工作流程。
注意:我不想在这个用例中使用结构化流。只想使用 DStreams。