问题标签 [spark-kafka-integration]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
217 浏览

apache-spark - 将 Spark DataFrame 写入 Kafka 是忽略分区列和 kafka.partitioner.class

我正在尝试将 Spark DF(批处理 DF)写入 Kafka,我需要将数据写入特定分区。

我尝试了以下代码

我正在编写的 DF 架构是

我必须重新分区(到 1 个分区)“myDF”,因为我需要根据日期列对数据进行排序。

它将数据写入单个分区,但不是 DF 的“分区”列中的数据或自定义分区器返回的数据(与分区列中的值相同)。

谢谢萨蒂什

0 投票
2 回答
1945 浏览

scala - Spark 从 Kafka 批量读取并使用 Kafka 跟踪偏移量

我知道使用 Kafka 自己的偏移跟踪而不是其他方法(如检查点)对于流式作业是有问题的。

但是我只想每天运行一个 Spark 批处理作业,读取从最后一个偏移量到最近偏移量的所有消息,并用它做一些 ETL。

理论上,我想像这样读取这些数据:

并让 Spark 根据group.id

不幸的是,Spark 从来没有将这些提交回来,所以我创造性地在我的 etl 工作结束时添加了这段代码,用于手动更新 Kafka 中消费者的偏移量:

这在技术上是可行的,但是下一次基于这个 group.id 阅读 Spark 仍然会从头开始。

我是否必须咬紧牙关并在某处跟踪偏移量,还是我忽略了一些东西?

顺便说一句,我正在使用EmbeddedKafka进行测试

0 投票
2 回答
259 浏览

apache-spark - 根据 Dataframe 中的条件向 Kafka 主题发送数据

我想根据 SparkStreaming 中数据的值更改 Kafka 主题目标以保存数据。有可能再次这样做吗?当我尝试以下代码时,它只执行第一个,但不执行较低的进程。

数据存储在主题名称 test 中。谁能想到一种方法来做到这一点?

我更改了目的地以保存这样的数据框。

类型A到主题测试。类型 B 到主题 testB。

0 投票
0 回答
190 浏览

scala - 使用 SASL 机制的 Spark Kafka 批量写入引发超时异常,主题不存在于元数据中

我正在从 cassandra db 读取数据并对其进行一些转换,然后通过 .save() 批处理方法将数据发送到 kafka。我也在使用 Kafka Producer 来设置属性。但每次我收到以下错误: 原因:org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元数据中不存在主题 XXXXXXXXXXXXX。所有配置、凭据均已设置。相同的代码在本地运行良好,因为那里没有 SASL 机制,但在集群上出现上述异常。请帮忙。

0 投票
1 回答
282 浏览

apache-spark - 在来自 Kafka 的结构化流中倒带和重新使用偏移量

有没有办法可以在结构化流中回退偏移量?我使用的是 Spark 版本 3,并且我已经将我的startingoffset 配置为最早,之后每次重新启动都会从检查点目录中选择偏移值。

例如:Kafka当前的偏移量是1000,checkpoint目录中提交的偏移量是900。我想再次从800重新消耗偏移量。我怎样才能做到这一点?

如果我取消当前运行并使用以下命令重置消费者组的偏移值。结构化流是否会在重新启动时从那里选择偏移量而不是考虑检查点目录?

0 投票
1 回答
464 浏览

apache-spark - 如何避免在火花流中排队

我有直接流媒体的火花流,我正在使用下面的配置

批处理间隔 60s

spark.streaming.kafka.maxRatePerPartition 42

auto.offset.reset 最早

当我开始使用最早选项的流式批处理时,为了更快地使用来自 Kafka 的消息并减少延迟,我将 spark.streaming.kafka.maxRatePerPartition 保持为 42。所以它应该消耗 42 x 60s x 60 分区 = 每批 151200 条记录.

我这里有两个问题

  1. 我看到这两个初始批次正确地消耗了 151200 条记录,尽管有很多记录要从 kafka 消耗,但在后面的批次中逐渐减少。请看下面的截图。可能是什么原因
  2. 我看到批次正在排队很多。我们怎样才能避免这种情况。

是否有可能实现以下场景我们将批处理间隔设置为 60s,如果每个批处理在 60s 内运行,则下一个批处理可以按时开始。如果一个批次的时间超过 60 秒,我们不希望下一批来排队。现有运行完成后,下一次运行可以通过选取该时间之前的记录来开始。这样我们就不会有滞后,也不会排队。

Spark UI - 问题 1 的屏幕截图

0 投票
1 回答
21 浏览

apache-kafka - Producer 正在发布到 Kafka,但无法从 Spark 结构化流中读取

我正在使用 Kafka 发布推文,它运行正常,因为我可以使用以下命令看到回声

但是当我尝试使用以下代码使用结构化流时

然后我得到一个输出,但它没有在 value 列下显示 Tweet。相反,我有一个奇怪的字母数字链,如下所示。我在没有截断列值的情况下进行了检查,得到了相同但更长的模式。

任何帮助了解情况将不胜感激。

0 投票
0 回答
144 浏览

apache-spark - 火花给予要求失败:文字必须具有与字符串对应的值,但找到了类字符串

我有一个火花 2.4.6,数据帧写为

但是在 3.0.2 版本中制作了一个 jar 并提交 spark 之后,它给出了:


在检查功能
时,我们在 3.0.2 https://github.com/apache/spark/blob/648457905c4ea7d00e3d88048c63f360045f0714/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions 中有该验证功能/literals.scala#L292

但在以前的版本中没有 https://github.com/apache/spark/blob/v2.4.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/表达式/literals.scala

在不升级 spark 版本的情况下是否有任何解决方案?
因为用 3.0.2 版重建 jar 工作正常。

0 投票
1 回答
92 浏览

apache-spark - 如何使用 Trigger.Once 选项在 Spark 3 Structure Stream Kafka/Files 源中配置 backpreasure

在 Spark 3 Behave of backpressure option on Kafka 和 File Source for trigger.once 场景发生了变化

但我有一个问题。当我想使用 TriggerOnce 时,如何为我的工作配置背压?

在 spark 2.4 我有一个用例,回填一些数据,然后启动流。所以我使用了一次触发器,但我的回填场景可能非常大,有时会因为洗牌和驱动程序内存而在我的磁盘上产生太大的负载,因为 FileIndex 缓存在那里。所以我使用 maxmaxOffsetsPerTriggermaxFilesPerTrigger控制我的 spark 可以处理多少数据。这就是我配置背压的方式。

现在你移除了这个能力,所以假设有人可以提出一个新的方法吗?

0 投票
2 回答
261 浏览

apache-spark - Spark Structured Streaming StreamingQueryListener.onQueryProgress 不按微批次调用?

我正在使用 Spark 3.0.2,并且我有一个流式作业,它使用来自 Kafka 的数据,触发持续时间为“1 分钟”。

我在 Spark UI 中看到定义的每 1 分钟有一个新作业,但我看到onQueryProgress每 5~6 分钟调用一次方法。我认为应该在每个微批处理之后直接调用此方法。

有没有办法控制这个持续时间并使其等于触发持续时间?