问题标签 [spark-streaming-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
376 浏览

apache-spark - Spark Streaming readStream 无法从安全的 Kafka (EventStreams) 中读取

我正在尝试将数据从程序发送到安全的 Kafka 集群(IBM Cloud 上的 EventStreams - Cloud Foundry Services),然后在我的消费者应用程序(即火花流)中,我试图从相同的卡夫卡源。

这是Properties我在生产者内部设置的:

这是我用来将数据发送到 Kafka 集群的代码:

我能够确认这确实将数据发送到 Kafka 集群,因为我能够使用 IBM 云提供的 Grafana 指标查看进入集群的数据。

现在在我的 spark 流代码中,这是我尝试从 kafka 源中读取的方式:

其次是:

我不确定为什么,但我的 Spark Streaming 根本无法从源读取数据。当我启动 spark 流程序时,它会在输出中显示:

有一次,当我再次运行我的制作人时,火花流仍然存在于0 0. 我不确定我在这里写错了什么。

编辑:让消费者运行超过 7 个小时,仍然没有变化

0 投票
1 回答
262 浏览

apache-spark - 使用 spark-streaming 将数据发布到 kafka 主题时重复

我有 spark-streaming 应用程序,它使用来自 topic1 的数据并解析它,然后将相同的记录发布到 2 个进程中,一个是 topic2,另一个是 hive 表。在将数据发布到 kafka topic2 时,我看到重复项,但在配置单元表中看不到重复项

使用火花 2.2,卡夫卡 0.10.0

有人可以帮忙吗,

预计 kafka topic2 中没有重复项。

0 投票
1 回答
145 浏览

apache-spark - 无法使用火花流从 kafka 主题中读取数据

我正在尝试使用火花流从 kafka 主题中读取数据。我能够将消息生成到 kafka 主题中,但是在使用 spark 流从主题中读取数据时,我收到如下错误消息:

下面是代码:

0 投票
1 回答
1229 浏览

scala - spark-submit 错误原因:java.lang.ClassNotFoundException: kafka.DefaultSource

在我的 spark 程序中,我有以下代码:

这是我的build.sbt文件:

如果我删除providedfrom libraryDependencies,我可以在 IntelliJ IDEA 中成功运行 scala 代码。

现在我这样做sbt assembly,然后当我尝试spark-submit使用以下命令在内部运行相同的程序时:

(PS.consumer-example.jar是我做完后得到的JAR sbt assembly

我收到此错误:

从错误日志来看,WeatherDataStream.scala:73是指load()上面写的代码。当这段代码在 IntelliJ中工作时,我不明白为什么它不能工作spark-submit

0 投票
0 回答
2143 浏览

apache-spark - 找不到 LoginModule 类:org.apache.kafka.common.security.plain.PlainLoginModule

环境:Spark 2.3.0、Scala 2.11.12、Kafka(无论最新版本是什么)

我有一个安全的 Kafka 系统,我正在尝试将我的 Spark Streaming Consumer 连接到该系统。下面是我的build.sbt文件:

请注意,这是 Spark 2.3.0,我无法更改我的 Spark 版本。

现在这是我尝试将 Spark Streaming 消费者连接到我的安全 Kafka 的部分代码:

当我尝试运行这个程序时,会抛出以下错误:

错误日志中的>>指向load()上面的代码片段。这几天我一直在尝试解决这个问题,但没有取得太大的成功。

0 投票
1 回答
793 浏览

java - 如何在 jar 文件的 spark-submit 中修复日志记录和版本兼容性

我正在尝试提交一个 jar 文件以在 spark 引擎上执行。我正在尝试将 spark 与 kafka 集成并使用 eclipse 构建和导出示例代码的 jar 文件https://github.com/apache/spark/tree/v2.1.1/examples

我有两个例外:

1) 版本不兼容。我正在使用 Scala 2.11.12、kafka_2.12-2.2.0、spark-2.4.3、java 版本“11.0.2”2019-01-15 LTS、spark-streaming-kafka-0-8-assembly_2.11 -2.4.3 我不知道该使用哪些版本,哪些是兼容的?

2)AbstractMethodError:接收器类 org.apache.spark.streaming.kafka.KafkaReceiver 没有定义或继承解析方法抽象的实现。

在 Eclipse 下,我尝试将 spark-2.4.3/jars 中的所有 jars 添加为外部 jars。我使用了命令:

./bin/spark-submit --files /home/symlab/software/spark-2.4.3/conf/log4j.properties --conf spark.driver.extraJavaOptions='-Dlog4j.configuration=file:/home/symlab/软件/spark-2.4.3/conf/log4j.properties' --class org.apache.spark.examples.streaming.JavaKafkaWordCount --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4 .3 --master local[2] /home/symlab/software/JavaKafkaWordCount.jar localhost:2181 test-consumer-group streams-plaintext-input 1

.........................................

19/06/17 18:01:06 INFO ReceiverSupervisorImpl:启动接收器 0

19/06/17 18:01:06 INFO ReceiverTracker:所有接收器已成功注销 19/06/17 18:01:06 INFO ReceiverTracker:ReceiverTracker 停止 19/06/17 18:01:06 INFO JobGenerator:停止 JobGenerator立即地

...................................................

19/06/17 18:01:06 INFO SparkContext:成功停止 SparkContext 19/06/17 18:01:06 INFO ShutdownHookManager:关闭挂钩调用 19/06/17 18:01:06 INFO ShutdownHookManager:删除目录 /tmp/ spark-6728496c-68f7-427f-b0e8-11da56c6caec 19/06/17 18:01:06 INFO ShutdownHookManager:删除目录 /tmp/spark-1ca3fa84-2c38-4855-a636-f77cfd22966b

0 投票
2 回答
5122 浏览

apache-spark - 使用 Kafka 的 Spark 结构化流不尊重startingOffset="earliest"

我已经设置了 Spark Structured Streaming (Spark 2.3.2) 以从 Kafka (2.0.0) 读取。如果消息在 Spark 流作业启动之前进入主题,我将无法从主题的开头消费。这是 Spark 流的预期行为,它忽略了在 Spark Stream 作业初始运行之前产生的 Kafka 消息(即使使用 .option("stratingOffsets","earliest"))?

重现步骤

  1. 在开始流式作业之前,创建test主题(单个代理,单个分区)并为主题生成消息(在我的示例中为 3 条消息)。

  2. 使用以下命令启动 spark-shell:spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2.3.1.0.0-78 --repositories http://repo.hortonworks.com/content/repositories/releases/

  3. 执行下面的 spark scala 代码。

预期与实际产出

我希望流从 offset=1 开始。但是,它从 offset=3 开始读取。可以看到,kafka 客户端实际上是在重置起始偏移量:2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.

我可以看到火花流处理我在开始流式传输作业后生成的消息。

这是 Spark 流的预期行为,它忽略了在 Spark Stream 作业初始运行之前产生的 Kafka 消息(即使使用.option("stratingOffsets","earliest"))?

火花批处理模式

我能够确认批处理模式从一开始就读取 - 所以 Kafka 保留配置没有问题

0 投票
1 回答
3073 浏览

apache-spark - DStreams:在foreachRDD中创建然后在foreachPartition内部修改的变量在foreachPartition之外重置一次?

我在 kafka 中有一堆消息,并使用火花流来处理这些消息。

当我的代码无法插入我的数据库时,我试图捕捉到这些消息,然后将这些消息重新插入 Kafka,以便稍后处理它们。

为了解决这个问题,我在我的 foreachRDD 函数中创建了一个名为“success”的变量。然后,当我尝试更新到数据库时,我返回一个成功插入的布尔值。我在测试期间注意到的是,当我尝试在 foreachPartition 期间插入时,这似乎效果不佳。当我离开 foreachPartition 函数时,成功值似乎被“重置”了。

然后我的日志输出显示了这一点!

2019-06-24 20:26:37 [INFO] 插入成功:false 2019-06-24 20:26:37 [INFO] 记录成功为 false。目前其设置为:true 2019-06-24 20:26:37 [INFO] 成功记录为 false。目前其设置为:false 2019-06-24 20:26:37 [INFO] 从所有分区成功插入数据库:true

即使在第三个日志中它说当前“成功”设置为 false,但是当我离开 foreachPartition 时,我再次记录它并将其设置回 true。

谁能解释为什么?或者提出不同的方法?

0 投票
2 回答
3541 浏览

scala - 如何对 Spark Structured Streaming 进行单元测试?

我想了解 Spark Structured Streaming 的单元测试方面。我的场景是,我从 Kafka 获取数据,并使用 Spark Structured Streaming 使用它并在数据之上应用一些转换。

我不确定如何使用 Scala 和 Spark 进行测试。有人可以告诉我如何使用 Scala 在结构化流中进行单元测试。我是流媒体新手。

0 投票
2 回答
1395 浏览

apache-spark - 反序列化来自 Kafka 主题的 Spark 结构化流数据

我正在使用 Kafka 2.3.0 和 Spark 2.3.4。我已经构建了一个 Kafka 连接器,它读取 CSV 文件并将 CSV 中的一行发布到相关的 Kafka 主题。这条线是这样的:“201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect”。CSV 包含 1000 条这样的行。连接器能够成功地将它们发布在主题上,我也能够在 Spark 中获取消息。我不确定如何将该消息反序列化到我的架构中?请注意,消息是无标题的,因此 kafka 消息中的关键部分为空。值部分包括上面的完整CSV 字符串。我的代码如下。

我看了这个 -如何使用 Java 中的结构化流从 Kafka 反序列化记录?但无法将其移植到我的 csv 案例中。此外,我尝试了其他 spark sql 机制来尝试从“值”列中检索单个行,但无济于事。如果我确实设法获得了编译版本(例如,indivValues 数据集或 dsRawData 上的映射),我会收到类似于以下内容的错误:“org.apache.spark.sql.AnalysisException: cannot resolve ' IC' given input columns: [value];” . 如果我理解正确,那是因为 value 是一个逗号分隔的字符串,如果我不做“某事”,spark 并不会真正为我神奇地映射它。

  • 我需要将数据键入为上面显示的自定义模式,因为我将对其进行数学计算(对于每个新行与一些旧行相结合)。
  • 在将它们推送到主题之前,在 Kafka 连接器源任务中合成标题是否更好?有标题会使这个问题的解决更简单吗?

谢谢!