问题标签 [spark-kafka-integration]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
19 浏览

apache-spark - 如果我使用Redis存储偏移量并且Sink是MySQL,是否可以实现exactly-once语义?

在我看来,如果我需要实现exactly-once语义,存储偏移量和确认提交这两个操作应该组合在一起作为一个事务,因此仅在MYSQL中很容易完成。

但是,我仍然很好奇如果我在两个不同的系统中执行这两个操作(如标题中所述)是否可以实现这一点,因为有时接收器是多种多样的,我想使用 Redis 作为管理偏移量的唯一系统。

0 投票
1 回答
69 浏览

apache-spark - 如何将我的结构化流数据帧发送到 kafka?

大家好 !

我正在尝试将我的结构化流数据帧发送到我的 kafka 主题之一,detection.

这是结构化流数据帧的架构:

但后来我尝试使用这种方法发送数据帧:

然后我得到了错误:

pyspark.sql.utils.AnalysisException:无法解析“ value”给定输入列:[DestinationComputer、DestinationPort、Sigma、SourceComputer、SourcePort、byteCount、duration、packetCount、processName、protocol、time、timestamp];第 1 行第 5 行;

如果我发送一个带有value它工作的列的数据框,我会收到关于我的 kafka 主题消费者的数据。

任何想法发送我的所有列的数据框?

谢谢 !

0 投票
0 回答
106 浏览

apache-spark - spark 3.2.0 与 2.4.8 中的结构化流检查点位置

我有一个在 spark 2.4.8 hadoop 2.6 build 上运行的 pyspark 脚本。

脚本是一个简单的带有 kafka 的结构化流。从主题中读取、提取、过滤、映射和写入另一个 kafka 主题。减少测试看起来像这样。

工作正常。

最近我设置了一个新的火花安装。没有 hadoop 分发的 Sprark 3.2.0 通过 SPARK_DIST_CLASSPATH 添加到 spark 的类路径中的 hadoop 2.6 包。

我正在连接到同一个 hadoop。

这就是问题所在。Spark 抛出与检查点位置相关的异常。

Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without authority: hdfs:/user/me/spark/checkpoints/spark-test

更奇怪的是,当我删除协议 .option("checkpointLocation", "hdfs:///user/me/spark/checkpoints/spark-test") \.option("checkpointLocation", "/user/me/spark/checkpoints/spark-test") \

脚本运行,甚至它在提供的位置检查点到 hdfs。

我的问题是 - spark 2.4.x 和 3.2.x 之间的结构化流中的检查点有什么变化?如果是这样,我如何为检查点提供其他文件系统?还是只是我的自定义 spark 安装问题 - spark 3 和 classpath 上提供的 hadoop 2.6 包?

0 投票
1 回答
30 浏览

apache-spark - 从 Kafka 到 Elastic Search 的 Spark 结构化流

我想写一个从 Kafka 到 Elasticsearch 的 Spark Streaming Job。在这里,我想在从 Kafka 读取模式时动态检测模式。

你能帮我这样做吗?

我知道,这可以通过下一行在 Spark 批处理中完成。

val schema = spark.read.json(dfKafkaPayload.select("value").as[String]).schema

但是在通过 Spark Streaming Job 执行相同的操作时,我们无法执行上述操作,因为流式处理只能在 Action 上进行。

请告诉我。

0 投票
1 回答
1015 浏览

apache-spark - 将 Pyspark 与 Kafka 连接起来

我在理解如何连接 Kafka 和 PySpark 时遇到问题。

我在 Windows 10 上安装了 kafka,主题很好地流式传输数据。我已经安装了运行正常的 pyspark——我能够毫无问题地创建测试 DataFrame。

但是当我尝试连接到 Kafka 流时,它给了我错误:

AnalysisException:找不到数据源:kafka。请按照“Structured Streaming-Kafka 集成指南”的部署部分部署应用程序。

Spark 文档并没有真正的帮助 - 它说: ... groupId = org.apache.spark artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...

对于 Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的部署小节。

然后当你去部署部分它说:

与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。spark-sql-kafka-0-10_2.12 及其依赖可以直接使用 --packages 添加到 spark-submit 中,例如 ./bin/spark-submit --packages org.apache.spark:spark-sql- kafka-0-10_2.12:3.2.0 ...

我正在开发应用程序,我不想部署它。如果我正在开发 pyspark 应用程序,在哪里以及如何添加这些依赖项?

尝试了几个教程最终变得更加困惑。

看到回答说

“您需要将 kafka-clients JAR 添加到您的 --packages”。如此回答

很少有更多的步骤可能有用,因为对于新手来说,这还不清楚。

版本

  • 卡夫卡 2.13-2.8.1
  • 火花3.1.2
  • 爪哇 11.0.12

所有环境变量和路径均已正确设置。

编辑

我已经加载:

正如建议的那样,但仍然出现相同的错误。我已经三重检查了 kafka、scala 和 spark 版本并尝试了各种组合,但没有奏效,我仍然遇到同样的错误:

AnalysisException:找不到数据源:kafka。请按照“Structured Streaming-Kafka 集成指南”的部署部分部署应用程序。

编辑 2

我安装了最新的 Spark 3.2.0 和 Hadoop 3.3.1 和 kafka 版本 kafka_2.12-2.8.1。更改了所有环境变量,测试了 Spark 和 Kafka - 工作正常。

我的环境变量现在看起来像这样:

仍然没有运气,我得到同样的错误:(

0 投票
1 回答
34 浏览

java - 具有翻转窗口延迟和重复数据的 Spark 结构化流

我正在尝试从 kafka 主题中读取,在翻滚的窗口上聚合一些数据并将其写入接收器(我一直在尝试使用 Kafka 和控制台)。

我看到的问题是

  • 发送数据和接收接收器上窗口的聚合记录之间的长时间延迟(应触发预期触发器后的几分钟)
  • 来自先前窗口聚合的重复记录出现在后续窗口中

为什么延迟这么长,我能做些什么来减少它?

为什么会显示以前窗口中的重复记录,如何删除它们?

随着窗口变短,延迟似乎特别糟糕——当我将窗口持续时间设置为 10 秒时,延迟时间为 3 分钟以上,当窗口持续时间设置为 60 秒时,延迟时间约为 2 分钟。

在最短的窗口时间下,我还看到记录被“捆绑”起来,因此当接收器接收到记录时,我一次收到几个窗口的记录。

在重复的聚合记录中,我确实将输出模式设置为完成,但我的理解是,如果触发器在其中多次触发,则记录应该只在当前窗口中重复,而我的不应该这样做。

我设置了与窗口时间和 10%(1 或 6 秒)的水印阈值相匹配的处理触发器,并且我知道如果我移除翻转窗口,流本身可以正常工作。

我明白为什么 spark 可能无法触发特定频率的触发器,但我认为 10 秒,当然 60 秒足以处理我正在测试的非常有限的数据量。

使用 60 秒翻转窗口和处理时间触发器发送数据的示例

  • 发送 6 个有效载荷
  • 等一下
  • 发送 1 个有效载荷
  • 稍等片刻
  • 发送 3 个有效载荷

(CreateTime 来自带有 --property print.timestamp=true 的 kafka-console-consumer)。这些在我期望触发器根据 CreateTime 时间戳和窗口触发后几分钟到达。

我有时确实会看到如下消息,但没有其他 WARN 或 ERROR 级别的消息表明存在问题:

应用数据/代码

示例数据如下所示,由 ts 设置为当前时间的 python 脚本生成:

应用程序代码(嵌入 spark.master=local[*] 运行)