问题标签 [spark-kafka-integration]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
294 浏览

apache-spark - 对从 Kafka 读取的火花流数据帧执行多个过滤器操作的最佳方法是什么?

我需要在从 Kafka 主题读取的 DataFrame 上应用多个过滤器,并将每个过滤器的输出发布到外部系统(如另一个 Kafka 主题)。

我读过这样的kafkaDF

我可以foreachBatch在此 Dataframe 上运行 a 然后遍历过滤器列表以获取过滤后的数据,然后可以将其发布到 kafka 主题,如下所示

但是,考虑到这么多迭代,我不确定这是否是最好的方法。有没有比这样做更好的方法?

0 投票
0 回答
59 浏览

apache-spark - Spark 流式处理动态批处理间隔

我有批处理间隔为 30 秒的火花流处理。如果批次在队列中堆积,我想完全跳过创建的新批次或动态更改批次间隔以减少创建批次的频率。我了解背压可用于动态更新每批中处理记录的速率。我正在寻找的是以下之一

  • 如果没有,则为队列中的批次数量引入阈值。队列中的批次超过阈值,然后不添加新批次
  • 动态更新批处理间隔,以便当批处理队列大小超出一定限制时,我可以增加批处理间隔,以便将更少的批处理添加到队列中。

有什么办法可以达到我的要求吗?

编辑:我使用的火花版本是 2.2.0

0 投票
0 回答
189 浏览

scala - 已解决 - 无法从 spark 设置 Kafka 连接的 TLS 参数

我在设置使用 TLS从spark连接到 Kafka 所需的参数时遇到问题。这是我目前的做法:

我还尝试使用前缀kafka.并将配置包含在我的 spark 提交中(使用--conf或包含.jks文件位置--files)。如果我使用spark.read而不是spark.readStream.

问题可以在日志中表示,其中我设置的参数仍然为空或继续具有默认值。此外,连接失败,就像我在不使用 TLS 证书的情况下尝试连接时一样(这是我当前的 kafka 所必需的):

目前,我正在使用spark 3.0.0and scala 2.12。另外,我正在使用以下命令提交作业:

有没有人有类似的问题?谢谢你。

更新 使用以下选项解决了我的问题:

0 投票
0 回答
12 浏览

apache-spark - Spark 流式传输:后面的 RDD 是否可能比 prevois RDD 更早完成计算?

背景是我正在尝试使用 Redis 来存储 Kafka 偏移量。

如果允许前一个 RDD 晚于后一个 RDD 计算,我认为后一个 RDD 已经写入的偏移量将被前一个 RDD 覆盖。因此,下次重新启动 Spark 程序时可能会导致重复消费。

0 投票
0 回答
118 浏览

apache-spark - 如何按时间限制来自kafka的火花消耗数据

我是 Spark 的新手。

我有一个火花流批处理作业(也许它应该是结构流),它每小时从 kafka 接收数据。

我发现我的火花一直在消耗数据并且不会停止。

所以我想控制它,例如,

有什么想法吗?谢谢。

- - - - 代码 - - - - -

0 投票
1 回答
250 浏览

apache-spark - Spark Kafka 数据消费包

我尝试使用文档中提到的以下代码来使用我的 kafka 主题:

我得到了错误:

所以我尝试了:

安装 kafka 包及其依赖项。但我收到此错误:

我应该怎么做才能安装这个包?

0 投票
1 回答
42 浏览

scala - 如何在火花结构化流式查询(Kafka)之后调用方法?

我需要根据从主题收到的值执行一些功能。我目前正在使用 ForeachWriter 将所有主题转换为列表。现在,我想将此列表作为参数传递给方法。

这是我到目前为止所拥有的

Ans 这就是我如何称呼我的流式查询

问题:

  1. 如何在流式作业中调用独立(非流式)方法?
  2. 我不能在这里使用 ForeachWriter,因为我想将 SparkSession 传递给方法,并且由于 SparkSession 不可序列化,所以我不能使用 ForeachWriter。并行调用方法 doA 和 doB 的替代方法是什么?
0 投票
1 回答
1158 浏览

python - kafka 与 Pyspark 结构化流式传输 (Windows) 的集成

在我的 windows 10 机器上安装 anaconda 后,我按照以下教程在我的机器上设置它并使用 jupyter 运行它:https ://changhsinlee.com/install-pyspark-windows-jupyter/

  • spark版本是3.1.2 python是3.8.8所以它是兼容的,现在将kafka与pyspark集成这是我的代码:

在这里它向我显示了我需要部署连接器的错误:

AnalysisException:找不到数据源:kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序

我转到页面并找到部署它的命令是:./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 ...

  • 我导航到我的 spark 文件夹所在的位置,并在 PowerShell 中以管理员身份执行命令,但出现以下错误:

我尝试寻找解决方案,但没有任何效果,我不知道在 --class 他们告诉我要添加的参数中传递什么,它说:访问拒绝,这意味着访问被拒绝我不明白这一点,所以谁能告诉我该怎么做?

  • ps:环境变量都到位并且工作完美,所以我不认为它来自那个
0 投票
1 回答
176 浏览

apache-spark - 如何在 ForeachWriter[Row] 中创建数据框

我有一个从 Kafka 作为源读取的流式查询。我想对从流中接收到的每个批次执行一些逻辑。到目前为止,我是这样做的

我使用的是 Spark 2.3.2,所以我坚持使用 ForeachWriter(我不能使用 foreachBatch,这会让我的生活更简单)。我也知道 foreach() 对执行程序执行。所以,记住这一点,我向所有执行者广播了 sparkSession。但这也无济于事。这是代码片段的注释部分。

我正在寻找一种解决方案,在 Spark 2.3.2 中将数据处理为 foreach 中的数据框(我必须使用数据框/数据集,因为操作非常繁重......它们也包括操作)

我发现了一个类似的问题,但没有任何回应 -->类似的 q

0 投票
0 回答
134 浏览

scala - Spark kafka 生产者在 kafka 摄取期间引入重复记录

我写了一个 spark kafka 生产者,它从 hive 中提取消息并推入 kafka,当我们摄入 kafka 时,大多数记录(消息)都会重复,尽管在推入 kafka 之前我没有任何重复。我添加了与一次性语义相关的配置,使 kafka 生产者具有幂等性

下面是我用于 kafka 生产者的代码片段

在kafka消费者端设置isolation.level-->Committed。

尝试设置 min.insync.replicas-->2(在我看来这个属性可能不会起重要作用,仍然尝试) Spark 版本:2.3.1 kafka 客户端版本:2.2.1

而且我在将消息生成到 kafka 时也使用事务,初始化开始并为每条消息提交事务。我一次摄取大约 1 亿条记录,我确实将数据分成更小的块,比如之前一次将 1 亿分成 100 万摄入卡夫卡

尝试使用结构化流,仍然没有运气

我不确定我是否缺少 kafka 生产者、代理或消费者端的任何配置。不知道我是否可以包括任何

提前致谢