问题标签 [spark-streaming-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
4696 浏览

apache-spark - Spark Streaming Kafka 背压

我们有一个 Spark Streaming 应用程序,它从接收器中的 Kafka 队列中读取数据,并进行一些转换并输出到 HDFS。批处理间隔为 1 分钟,我们已经调整了背压和spark.streaming.receiver.maxRate参数,因此大部分时间都可以正常工作。

但是我们还有一个问题。当 HDFS 完全宕机时,批处理作业会长时间挂起(假设 HDFS 不工作 4 小时,作业会挂起 4 小时),但接收方不知道作业未完成,因此它仍在接收接下来 4 小时的数据。这导致OOM异常,整个应用程序宕机,我们丢失了很多数据。

所以,我的问题是:是否有可能让接收者知道工作没有完成,所以它会收到更少(甚至没有)的数据,当工作完成时,它会开始接收更多的数据来赶上。在上述情况下,当HDFS down时,receiver会从kafka中读取较少的数据,并且接下来4小时产生的block真的很小,receiver和整个应用程序都没有down,在HDFS ok后,receiver会读取更多数据并开始迎头赶上。

0 投票
1 回答
917 浏览

apache-spark - Spark Streaming Kafka createDirectStream - Spark UI 将输入事件大小显示为零

我已经使用 createDirectStream 实现了 Spark Streaming。我的 Kafka 生产者每秒向具有两​​个分区的主题发送几条消息。

在 Spark 流方面,我每秒读取一次 kafka 消息,并且我将它们以 5 秒的窗口大小和频率窗口化。

Kafka 消息得到了正确处理,我看到了正确的计算和打印。

但在 Spark Web UI 中,在 Streaming 部分下,每个窗口的事件数显示为零。请看这张图片:

显示零事件的 Spark UI

我很困惑为什么它显示为零,它不应该显示被馈送到 Spark Stream 的 Kafka 消息的数量吗?

更新:

当我使用 groupByKeyAndWindow() api 时,似乎发生了这个问题。当我从我的代码中注释掉这个 api 使用时,Spark Streaming UI 开始正确报告 Kafka 事件输入大小。

知道为什么会这样吗?这可能是 Spark Streaming 的缺陷吗?

我正在使用 Cloudera CDH:5.5.1,Spark:1.5.0,Kafka:KAFKA-0.8.2.0-1.kafka1.4.0.p0.56

在此处输入图像描述

0 投票
6 回答
25821 浏览

apache-spark - Kafka Producer - 找不到 org.apache.kafka.common.serialization.StringSerializer

我创建了一个简单的 Kafka Producer & Consumer。我正在使用 kafka_2.11-0.9.0.0。这是我的生产者代码。

在启动捆绑包时,我面临以下错误:

我尝试设置key.serializervalue.serializer如下:

也喜欢,但仍然得到同样的错误。我在这里做错了什么。

0 投票
0 回答
1172 浏览

apache-kafka - 火花流 - 卡夫卡 - java.nio.BufferUnderflowException

在尝试通过 Spark 流(Kafka 直接 API)使用来自 Kafka 的消息时,我遇到了以下错误。这在使用 Spark 独立集群管理器时可以正常工作。我们刚刚切换到使用 Cloudera 5.7 使用 Yarn 来管理 Spark 集群并开始看到以下错误。

一些细节: - Spark 1.6.0 - 使用 Kafka 直接流 API - Kafka 代理版本 (0.8.2.1) - Yarn 执行器的类路径中的 Kafka 版本 (0.9) - 不受 Cloudera 管理的 Kafka 代理

我看到使用独立集群管理器和纱线之间的唯一区别是在消费者端使用的 Kafka 版本。(0.8.2.1 对 0.9)

试图弄清楚版本不匹配是否真的是一个问题?如果确实如此,除了将 Kafka 代理升级到 0.9 之外,还有什么解决方法。(最终是,但现在不是)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 200.0 failed 4 times, most recent failure: Lost task 0.3 in stage 200.0 (TID 203,..): java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151) at java.nio.ByteBuffer.get(ByteBuffer.java:715) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40) at kafka.api.TopicData$.readFrom(FetchResponse.scala:96) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942) at org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

0 投票
1 回答
317 浏览

apache-spark - 火花流 + kafka 兼容性问题

火花流与 0.8.2.1 以上的 kafka 版本兼容吗?编写自定义接收器是使火花流使用 0.9 以上的 kafka 版本的唯一选择吗?

0 投票
1 回答
185 浏览

apache-spark - 卡夫卡火花流集成

我已经在我的系统中使用 maven 设置了 Kafka 和 spark 流。除了在生产者中输入内容并在消费者中看到之外,我想知道任何可以帮助我进行更广泛操作的建议。

如何创建一个源,将 json 或 avro 等数据连续放入 Kafka 生产者,这样我就可以用 spark 处理它并从中执行一些操作。需要建议我该如何设计这个

0 投票
1 回答
4566 浏览

apache-spark - 火花卡夫卡流错误 - “java.lang.NoClassDefFoundError:org/apache/spark/streaming/kafka/KafkaUtils

我正在 Eclipse 中编写一个简单的 kafka - spark 流代码,以使用 spark 流使用来自 kafka 代理的消息。下面是代码,当我尝试从 Eclipse 运行代码时收到错误消息。

我还确保依赖 jars 就位,请帮助摆脱这个错误

对象 spark_kafka_streaming {

例外:

在 org.firststream.spark_kakfa.spark_kafka_streaming 的 org.firststream.spark_kakfa.spark_kafka_streaming$.main(spark_kafka_streaming.scala:30) 处的线程“main”java.lang.NoClassDefFoundError 中的异常:org/apache/spark/streaming/kafka/KafkaUtils$ .main(spark_kafka_streaming.scala) 引起:java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) ... 还有 2 个

依赖项:

0 投票
1 回答
679 浏览

apache-spark - spark流+kafka sbt编译

我有一个火花流+卡夫卡的例子。它在 IDE 中运行良好。但是当我尝试从控制台通过 SBT 编译它时,比如sbt compile。有错误。

主类:

错误信息:

sbt:

您有解决方法的想法吗?

0 投票
1 回答
3693 浏览

apache-spark - 如何为 Spark Streaming 定义 Kafka(数据源)依赖项?

我正在尝试使用 spark-streaming2.0.0 使用 kafka 0.8 主题,我正在尝试识别我尝试在 build.sbt 文件中使用这些依赖项所需的依赖项

当我运行 sbt 包时,我得到所有这三个 jar 的未解决依赖项,

但是这些罐子确实存在

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.0.0

请帮助调试这个问题,我是 Scala 的新手,所以如果我做的不对,请告诉我

0 投票
2 回答
5133 浏览

java - Spark Streaming Kafka 消费者

我正在尝试设置一个 Spark Streaming 简单应用程序,它将读取来自 Kafka 主题的消息。

经过大量工作,我处于这个阶段,但得到了下面显示的例外情况。

代码:

哪个抛出:

出于绝望,我尝试连接到 Zookeeper:

但这会引发:

相关的依赖是:

我想问:

我应该连接到 Kafka 代理还是 Zookeeper 服务器?

我在我的代码中做错了什么,无法连接/收听传入的消息?