问题标签 [spark-streaming-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1125 浏览

apache-kafka - 如何从 Spark 结构化流中的特定 Kafka 分区中读取数据

我的 Kafka 主题有三个分区,我想知道我是否可以从三个分区中的一个分区中读取。我的消费者是 spark 结构化流应用程序。

下面是我在 spark 中现有的 kafka 设置。

0 投票
2 回答
1910 浏览

java - Spark 2.4.0 Avro Java - 无法解析方法 from_avro

我正在尝试从包含 Avro 消息的 kafka 队列运行火花流。

根据https://spark.apache.org/docs/latest/sql-data-sources-avro.html我应该可以from_avro用来将列值转换为Dataset<Row>.

但是,我无法编译该项目,因为它抱怨from_avro找不到。我可以看到在依赖的 package.class 中声明的方法。

如何在本地 Java 代码中使用该from_avro方法?org.apache.spark.sql.avro

pom.xml:

似乎Java无法从sql.avro.package.class

0 投票
2 回答
1358 浏览

python - 使用 PySpark Streaming 反序列化 Kafka json 消息

我有一个 pyspark 应用程序正在使用来自 Kafka 主题的消息,这些消息由org.apache.kafka.connect.json.JsonConverter. 我正在使用融合的 Kafka JDBC 连接器来执行此操作

问题是,当我使用消息时,ID 列出现在某种编码文本中,例如“ARM=”,而它应该是数字类型。

这是我现在拥有的代码

我知道 createDirectStream 有一个我可以设置的 valueDecoder 参数,问题是我不知道如何使用它进行解码。我也事先知道架构,因此如果需要,我将能够创建一个。

作为参考,这是我打印出 rdd.foreach 时得到的 JSON

0 投票
1 回答
284 浏览

apache-spark - 连续处理模式和python udf

Spark 2.4.0 是否支持具有连续处理模式的 Python UDF?

在我的简单代码中,我从一个 kafka 主题中消费,每行进行一些简单的处理(基本上向 json 消息添加一个虚拟字段)并写出另一个主题。

但是,当我使用连续模式触发时,我在输出主题中看不到任何消息。如果我不使用 python udf,那么它工作正常。

编辑:这个错误报告(与这篇文章无关)报告缺乏支持。

0 投票
1 回答
122 浏览

apache-spark - 了解 kakfa 结构化流中的检查点

在这篇(https://dzone.com/articles/what-are-spark-checkpoints-on-dataframes)文章中,它说检查点用于“在我做其他事情之前冻结数据帧的内容”。

然而,在这篇(http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-7/)文章中,它说检查点用于从故障中恢复。从这里我收集到,如果 spark 正在处理一个 kafka 主题并且 spark 崩溃,在它重新启动后,它将从它上次检查点的偏移量开始处理。这个说法正确吗?

spark中有两种不同的检查点概念吗?因为我无法调和2。

0 投票
0 回答
50 浏览

apache-spark - Spark 与具有两个不同主体的 Kafka 交互

我有以下问题。我正在使用 Spark Structured Streaming 作业,该作业从一个主题读取并写入同一 kerberized Kafka 集群的另一个主题。一切都超级好。

但我的问题如下:我将如何处理这两个主题中的每一个都有不同的主体和密钥表的情况。

  • 我应该向 Spark 提交两个 JAAS 文件(每个主体/keytab 一个)吗?如果是,如何实现?
  • 我可以将两个“KafkaClient”声明放入一个 JAAS 文件中吗?

非常感谢。

0 投票
1 回答
3594 浏览

apache-spark - Spark流和kafka缺少所需的配置“partition.assignment.strategy”,没有默认值

我正在尝试使用 yarn 与 Kafka 一起运行 spark 流应用程序。我收到以下堆栈跟踪错误-

原因:org.apache.kafka.common.config.ConfigException:缺少没有默认值的必需配置“partition.assignment.strategy”。在 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) 在 org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48) 在 org.apache.kafka.clients。 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:380) 上的 consumer.ConsumerConfig.(ConsumerConfig.java:194) org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:363) ) 在 org.apache.spark.streaming.kafka010.CachedKafkaConsumer.(CachedKafkaConsumer.scala:45) 在 org.apache.spark.streaming 的 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:350)。 kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:194) 在 org.

这是我如何使用火花流创建 KafkaStream 的代码片段-

我浏览了以下帖子-

  1. https://issues.apache.org/jira/browse/KAFKA-4547
  2. Pyspark Structured Streaming Kafka 配置错误

据此,我已将我的 fat jar 中的 kafka util jar 从默认从 spark-stream-kafka-jar 打包的 0.10.1.0 更新为0.10.2.0版本作为瞬态依赖项。当我通过将master设置为本地来在单个节点上运行它时,我的工作也可以正常工作。我正在运行 spark 2.3.1 版本。

0 投票
1 回答
527 浏览

apache-spark - 如何在 Spark Streaming 中创建与数据源的连接以进行查找

我有一个用例,我们正在流式传输事件,并且对于每个事件我都必须进行一些查找。查找在 Redis 中,我想知道创建连接的最佳方法是什么。Spark Streaming 将运行 40 个执行程序,我有 5 个这样的 Streaming 作业都连接到同一个 Redis 集群。所以我很困惑我应该采取什么方法来创建 Redis 连接

  1. 在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以用广播变量做到这一点吗?

  2. 为每个分区创建一个 Redis 连接,但是我的代码是这样编写的

    val update = xyz.transform(rdd => { // on driver if (xyz.isNewDay) { ..... } rdd }) update.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(Key_trans => { // perform some lookups logic here } } })

所以现在如果我在每个分区内创建一个连接,这意味着对于每个 RDD 和该 RDD 中的每个分区,我都将创建一个新连接。

有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?

如果需要,我可以添加更多上下文/信息。

0 投票
1 回答
1276 浏览

apache-spark - Spark 结构化流恰好一次 - 未实现 - 重复事件

我正在使用 Spark Structured Streaming 来使用来自 Kafka 的事件并将它们上传到 S3。

检查点在 S3 上提交:

偏移量通过以下方式提交给 Kafka StreamingQueryListener

应用程序启动后,它会从 Kafka 检索偏移图并启动流:

我将topic/partition/offset数据存储在 ORC 文件中。

数据包含具有精确事件的多个副本topic/partition/offset

应该如何配置流以实现一次处理?

0 投票
0 回答
420 浏览

apache-spark - 使用 Kafka + Spark Streaming 恰好一次

是否可以通过在 Spark Streaming 应用程序中处理 Kafka 主题来实现一次?

要实现恰好一次,您需要以下内容:

  1. 卡夫卡生产者到卡夫卡经纪人的一次。这是由 Kafka 的 0.11 幂等生产者实现的。但是 Kafka 0.11 到 Spark Streaming 的集成生产准备好了吗?我发现这张 JIRA 票有很多错误。
  2. 在 Kafka 代理到 Spark Streaming 应用程序上恰好有一次。能实现吗?由于 Spark Streaming 应用程序故障,应用程序可以读取某些数据两次,对吗?作为解决方案,我可以将计算结果和最后处理的事件 uuid 保存到 Redis 事务吗?
  3. 通过 Spark Streaming 应用程序转换数据仅一次。这是 RDD 的开箱即用属性。
  4. 仅在持久结果上出现一次。在第二条语句中通过将最后一个事件 uuid 持久保存到 Redis 来解决。