问题标签 [spark-kafka-integration]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
284 浏览

python - Kafka 与 Pyspark 结构化流式处理作业的集成卡在 [*] 中(使用 jupyter)

在安装 Pyspark 并对其进行测试后,它工作正常并为 kafka 集成添加了正确的连接器,现在当我尝试从同一网络中的另一台机器从 kafka 加载日期并开始工作时,它卡在 [*] 中,不错误,没有什么,我不明白这里的问题,所以请如果有人可以帮助我,这是我的代码:

在此处输入图像描述

  • 当我尝试领导 kafka 时,它没有显示任何错误并继续,但是当我尝试开始工作时,它卡住了
  • PS:奇怪的是我试图在托管kafka的机器关闭时运行这段代码,它加载了kafka,即:这段代码没有错误:
  • 并继续,直到它再次卡在最后一段代码中,如上图所示,这很奇怪
  • 所以请有什么建议吗?
0 投票
1 回答
42 浏览

json - 如何将 kafka 消息值转换为特定模式?

我正在尝试使用 Pyspark 从 Kafka 主题中读取数据。我想将该数据转换为特定的模式。但无法这样做。

这是我尝试过的:

我想要的是:

有没有办法获得所需的架构?

0 投票
1 回答
534 浏览

scala - Spark Scala:找不到数据源:kafka

我正在尝试使用sparkByExamples.com中的示例。但由于某种原因,spark 程序不会从 kafka 主题中读取数据。

代码在这里

错误信息如下:

基本上,我有一个 kafka topic1 < data.json 并从头开始阅读。

我最近开始了 spark + scala + kafka 活动,非常感谢您的帮助。

pom.xml:

0 投票
1 回答
155 浏览

apache-spark - PySpark Kafka - NoClassDefFound:org/apache/commons/pool2

我在将数据从 kafka 主题打印到控制台时遇到问题。我收到的错误消息如下图所示。 任务错误中的异常

正如您在上图中看到的那样,在第 0 批之后,它不会进一步处理。

错误2

错误3

所有这些都是错误消息的快照。我不明白发生错误的根本原因。请帮我。

以下是kafka和spark版本:

我正在使用以下罐子:

这是我的代码:

0 投票
0 回答
182 浏览

apache-spark - Spark AQE Post-Shuffle partitions coalesce 无法按预期工作,甚至在某些分区中造成数据倾斜。为什么?

我在我的 spark DF 上使用全局排序,当我启用 AQE 和 post-shuffle 合并时,排序操作后的分区变得比以前更糟糕。

我的查询,在高层次上,看起来:

  1. 可能导致倾斜的列->是的,我的数据分布不均,这就是我使用盐的原因。
  2. 我从 Kafka 读取数据,所以我使用 Kafka 分区 + 偏移列作为盐。
  3. 为什么在后台使用 reprtitoinByRange 的排序对我没有帮助,我想启用 AQE?-> 现在我看到我的 Kafka 消息的大小差异可能太大。因此,我看到范围重新分区后的分区具有几乎相同数量的记录,但字节数仍然非常不均匀。
  4. 为什么我认为 AQE 必须帮助我?-> 我想创建许多小范围,即使我的数据偏差不会超过~50mb,所以后洗牌合并将能够将它们合并到目标大小(256mb)。在我的情况下,最高 320mb 是可以的。

我的第一个假设是,即使范围很小,峰值也会太大。但我检查并确认按范围重新分区给了我良好的记录分布,但不好的是大小。我有近 200 个分区,记录数量几乎相同,大小差异高达 9 倍,从 ~100Mb 到 ~900mb。但是通过 AEQ 和重新分区到 18000 个小范围,最小的分区是 18mib,最大的分区是 1.8Gib。这种情况比没有 AEQ 的情况要糟糕得多。需要强调的是,我使用 Spark UI -> Details for Stage 选项卡中的指标来确定分区大小(以字节为单位),并且我有自己的记录日志。

所以我开始调试问题,但是 AQE 的输入和输出没有足够的日志 ShufflePartitionsUtil.coalescePartitions。这就是我将查询重写为 repartitionByRange.sortWithingPartitoins 的原因。并通过额外的日志记录进行物理计划优化。我的日志告诉我,我最初的想法是正确的。

  • map 和 write shuffle 阶段之后的输入分区被拆分为足够小
  • Coalesce 算法将它们收集到一个正确的数量,并且分布在字节分区中。

最小大小是如此不同,因为最后一个分区的大小,这是预期的。TRACE 日志级别显示 99% 的分区接近 290mib。

  • 但是为什么 spark UI 显示出如此不同的结果呢?->

  • spark UI 可能有问题吗?->

  • 也许吧,但除了任务大小,一个任务的持续时间也太大了,这让我觉得 spark UI 还可以。

  • 所以我的假设是问题出MapOutputStatistics在我的阶段。但它总是坏掉还是只在我的情况下?->

  • 仅在我的情况下?-> 我做了一些检查以确认它。

    • 我从 s3(块大小为 120mb 的镶木地板文件)-> 中读取了相同的数据集,并且 AQE 按预期工作。洗牌后合并返回给我 188,按大小、分区很好地分布。重要的是要注意 s3 上的数据分布不均,但在读取过程中的 spark 将其拆分为 259 个接近 120mb 大小的分区,主要是因为 parquet 块大小为 120mb。
    • 我从 Kafka 中读取了相同的数据集,但从分区函数中排除了具有偏斜的列 -> 并且 AQE 按预期工作。洗牌后合并返回给我 203,按大小、分区很好地分布。
    • 我尝试禁用缓存-> 这没有任何结果。我使用缓存,只是为了避免从 kafka 重复读取。因为按范围重新分区使用采样。
    • 我尝试禁用 AQE 并将 18000 个分区写入 s3 -> 结果是预期的,与我的合并输入日志显示的相同:17999 个文件,最小的接近 8mib,最大的 56mib。
  • 所有这些检查让我认为这MapOutputStatistics仅对我的情况是错误的。可能是如何与 Kafka 源相关联或我的 Kafka 输入数据分布非常不均匀的问题。

问题:

  • 那么有人知道我做错了什么吗?
  • 在我的情况下,我可以用输入数据做些什么来使后洗牌合并工作?
  • 如果你认为我是对的,请发表评论。

PS 我还想提一下,我的输入 Kafka 数据帧是 2160,甚至不是分布式分区 -> 某些分区可以比其他分区大 2 倍。minPartitions从具有 720 个分区和选项 * 3的 Kafka 主题中读取。

0 投票
0 回答
73 浏览

apache-kafka - 安装在单台 PC 上时如何提高 Kafka(生产者和消费者)的性能

我正在做一个视频分析项目,我必须从 10 个闭路电视摄像机中检测 5 种物体。而且,客户只提供了一个 Ubuntu PC 系统来部署我的视频分析引擎。

现在,我必须在一台 PC 上安装我所有的应用程序模块。

所以我选择下面的大数据技术栈来实现这个,

由于我只有一个 PC 和一个 SSD 驱动器,

如果我对主题 + 单一消费者方法使用单一经纪人 + 多个生产者 + 单一分区,可以吗?

或者我是否需要遵循其他方法才能获得高吞吐量?

如果有人在这方面提供建议,我将非常有帮助,

注意:我知道这将有一个单点故障(使用单台 PC)。但客户同意继续。

0 投票
0 回答
79 浏览

scala - 如何调整 Spark Streaming GC?

我正在使用带有 kafka 的 Spark 3.0.1 来聚合 12,5000,000 个用户的数据。记录约为每秒 200000 条。输出方式为“追加”,数据库为Mongo。Spark 从 kafka 读取数据并将结果写入 Mongo。然而,应用程序花费大量时间进行 GC。为什么应用程序使用了这么多内存?我想知道有没有办法及时释放内存或者我的配置错误?

运行参数为:

在此处输入图像描述

0 投票
1 回答
954 浏览

scala - Spark Kafka 连接失败:bootstrap.servers 中没有给出可解析的引导 URL

我尝试使用Spark 3.0.2阅读 kafka 主题,我使用以下库做了一个spark shell :

  • spark-sql-kafka-0-10_2.12-3.0.2.jar
  • kafka-avro-serializer-6.2.0.jar
  • kafka-clients-2.4.1.jar
  • spark-streaming-kafka-0-10-assembly_2.12-3.0.2.jar
  • spark-tags_2.12-3.0.2.jar
  • spark-token-provider-kafka-0-10_2.12-3.0.2.jar
  • commons-pool2-2.6.2.jar

我收到带有错误堆栈跟踪的以下输出:

谁能知道如何解决它?

这是 scala 中的火花代码:

非常感谢。

0 投票
0 回答
209 浏览

java - 来自 kafka 主题的 Spark 流中 max.partition.fetch.bytes 的意义

我正在使用带有 kafka_2.12-2.2.1 的 Spark 2.4.0

我试图了解消费者配置“max.partition.fetch.bytes”的影响。默认值为 1048576 字节 (1 MB),它限制了 1 个请求中每个分区可以获取的最大字节数。我正在评估我们是否可以增加此标志的值(以及 message.max.bytes)以给每个 Spark 执行器施加更多负载(假设我们在执行器上有足够的内存。在我的设置中,Spark Web UI 没有显示太多内存在每个执行器中使用)

假设我们有

  1. 让我们考虑只有 1 个批次(比如 30 秒批次间隔)有 90 条记录。
  2. 具有 3 个分区的 Kafka 主题“T1”
  3. 3 个 Spark 执行器,每个具有 1 个插槽/核心。
  4. 假设通过正确的压缩,消费者端可以消耗更多的数据。但是为了简单起见,我们假设所有消息都是在没有任何压缩的情况下发送的。
  5. 每条消息为 100k。
  6. 发送到主题 T1 的记录数:90

在 Spark Web UI 中,我可以看到 3 个执行程序/任务中的每一个都获得 1 个偏移量为 0 到 30 的分区。

由于每条消息的大小为 100K,因此在对 kafka 代理的 1 个请求中,每个分区只能获取 10 条消息(如 max.partition.fetch.bytes 所示)。我还可以在日志中看到,对于从每个执行程序发送到 Kafka 代理的每个分区 3 网络请求。这是来自 1 个执行者的缩短日志:

  1. NetworkClient:将 FETCH fetch_offset=0,partition_max_bytes=1048576 发送到 kafka 节点
  2. 提取器:在偏移量 0 处提取 ..
  3. NetworkClient:发送 FETCH fetch_offset=10,partition_max_bytes=1048576 到 kafka 节点
  4. Fetcher:在偏移量 10 处获取 ..
  5. NetworkClient:发送 FETCH fetch_offset=20,partition_max_bytes=1048576 到 kafka 节点
  6. Fetcher:在偏移量 20 处获取 ..

我可以在 Kafka 代理端看到这些偏移量的类似请求日志。

对于 90 条记录的流式批处理,3 个任务中的每一个都被分配为 0-30 的分区偏移量。

即3个Partitions for Topic,3个Spark Executors,3个Tasks,每个任务处理偏移0到30。

因为 max.partition.fetch.bytes 默认为 1 MB,所以每个分区数据从每个 Task 的 3 个网络请求中获取,偏移量为 0 到 30。 (0-10,10-20,20-30)

所以假设处理数据集的代码是

  1. 因此,只有在检索到所有分配的偏移量(到任务 i. 0 到 30)的数据后,才会执行保存操作。

  2. 或者任何增量数据(直到分配给任务的分区)已经被提取,保存操作被执行,这一直持续到所有偏移量(分配给任务)都被处理。

  3. 我相信我们可以增加 max.partition.fetch.bytes 来减少 no。假设我们有足够的带宽/内存或在另一端向 Kafka 代理发出的网络请求,使用它来限制每个获取请求的负载。

欣赏是否有人可以确认/纠正这些假设或提供更多解释。

0 投票
0 回答
176 浏览

apache-spark - 结构化流 + Kafka 集成中的 PySpark 错误异常(60000 毫秒后元数据中不存在主题)

Hadoop 3.3.1 + Spark 3.1.2

  • OpenJDK 1.8.0
  • 斯卡拉 2.12.14

图书馆

  • spark-sql-kafka-0-10_2.12-3.1.2.jar
  • kafka-clients-2.6.0.jar
  • spark-token-provider-kafka-0-10_2.12-3.1.2.jar

打开 PySpark 外壳

PySpark 代码

异常错误消息:

主题分区信息。