问题标签 [spark-kafka-integration]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - 如果我使用Redis存储偏移量并且Sink是MySQL,是否可以实现exactly-once语义?
在我看来,如果我需要实现exactly-once语义,存储偏移量和确认提交这两个操作应该组合在一起作为一个事务,因此仅在MYSQL中很容易完成。
但是,我仍然很好奇如果我在两个不同的系统中执行这两个操作(如标题中所述)是否可以实现这一点,因为有时接收器是多种多样的,我想使用 Redis 作为管理偏移量的唯一系统。
apache-spark - 如何将我的结构化流数据帧发送到 kafka?
大家好 !
我正在尝试将我的结构化流数据帧发送到我的 kafka 主题之一,detection
.
这是结构化流数据帧的架构:
但后来我尝试使用这种方法发送数据帧:
然后我得到了错误:
pyspark.sql.utils.AnalysisException:无法解析“ value
”给定输入列:[DestinationComputer、DestinationPort、Sigma、SourceComputer、SourcePort、byteCount、duration、packetCount、processName、protocol、time、timestamp];第 1 行第 5 行;
如果我发送一个带有value
它工作的列的数据框,我会收到关于我的 kafka 主题消费者的数据。
任何想法发送我的所有列的数据框?
谢谢 !
apache-spark - spark 3.2.0 与 2.4.8 中的结构化流检查点位置
我有一个在 spark 2.4.8 hadoop 2.6 build 上运行的 pyspark 脚本。
脚本是一个简单的带有 kafka 的结构化流。从主题中读取、提取、过滤、映射和写入另一个 kafka 主题。减少测试看起来像这样。
工作正常。
最近我设置了一个新的火花安装。没有 hadoop 分发的 Sprark 3.2.0 通过 SPARK_DIST_CLASSPATH 添加到 spark 的类路径中的 hadoop 2.6 包。
我正在连接到同一个 hadoop。
这就是问题所在。Spark 抛出与检查点位置相关的异常。
Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without authority: hdfs:/user/me/spark/checkpoints/spark-test
更奇怪的是,当我删除协议
.option("checkpointLocation", "hdfs:///user/me/spark/checkpoints/spark-test") \
时
.option("checkpointLocation", "/user/me/spark/checkpoints/spark-test") \
脚本运行,甚至它在提供的位置检查点到 hdfs。
我的问题是 - spark 2.4.x 和 3.2.x 之间的结构化流中的检查点有什么变化?如果是这样,我如何为检查点提供其他文件系统?还是只是我的自定义 spark 安装问题 - spark 3 和 classpath 上提供的 hadoop 2.6 包?
apache-spark - 从 Kafka 到 Elastic Search 的 Spark 结构化流
我想写一个从 Kafka 到 Elasticsearch 的 Spark Streaming Job。在这里,我想在从 Kafka 读取模式时动态检测模式。
你能帮我这样做吗?
我知道,这可以通过下一行在 Spark 批处理中完成。
val schema = spark.read.json(dfKafkaPayload.select("value").as[String]).schema
但是在通过 Spark Streaming Job 执行相同的操作时,我们无法执行上述操作,因为流式处理只能在 Action 上进行。
请告诉我。
apache-spark - 将 Pyspark 与 Kafka 连接起来
我在理解如何连接 Kafka 和 PySpark 时遇到问题。
我在 Windows 10 上安装了 kafka,主题很好地流式传输数据。我已经安装了运行正常的 pyspark——我能够毫无问题地创建测试 DataFrame。
但是当我尝试连接到 Kafka 流时,它给了我错误:
AnalysisException:找不到数据源:kafka。请按照“Structured Streaming-Kafka 集成指南”的部署部分部署应用程序。
Spark 文档并没有真正的帮助 - 它说: ... groupId = org.apache.spark artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...
对于 Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的部署小节。
然后当你去部署部分它说:
与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。spark-sql-kafka-0-10_2.12 及其依赖可以直接使用 --packages 添加到 spark-submit 中,例如 ./bin/spark-submit --packages org.apache.spark:spark-sql- kafka-0-10_2.12:3.2.0 ...
我正在开发应用程序,我不想部署它。如果我正在开发 pyspark 应用程序,在哪里以及如何添加这些依赖项?
尝试了几个教程最终变得更加困惑。
看到回答说
“您需要将 kafka-clients JAR 添加到您的 --packages”。如此回答
很少有更多的步骤可能有用,因为对于新手来说,这还不清楚。
版本:
- 卡夫卡 2.13-2.8.1
- 火花3.1.2
- 爪哇 11.0.12
所有环境变量和路径均已正确设置。
编辑
我已经加载:
正如建议的那样,但仍然出现相同的错误。我已经三重检查了 kafka、scala 和 spark 版本并尝试了各种组合,但没有奏效,我仍然遇到同样的错误:
AnalysisException:找不到数据源:kafka。请按照“Structured Streaming-Kafka 集成指南”的部署部分部署应用程序。
编辑 2
我安装了最新的 Spark 3.2.0 和 Hadoop 3.3.1 和 kafka 版本 kafka_2.12-2.8.1。更改了所有环境变量,测试了 Spark 和 Kafka - 工作正常。
我的环境变量现在看起来像这样:
仍然没有运气,我得到同样的错误:(
java - 具有翻转窗口延迟和重复数据的 Spark 结构化流
我正在尝试从 kafka 主题中读取,在翻滚的窗口上聚合一些数据并将其写入接收器(我一直在尝试使用 Kafka 和控制台)。
我看到的问题是
- 发送数据和接收接收器上窗口的聚合记录之间的长时间延迟(应触发预期触发器后的几分钟)
- 来自先前窗口聚合的重复记录出现在后续窗口中
为什么延迟这么长,我能做些什么来减少它?
为什么会显示以前窗口中的重复记录,如何删除它们?
随着窗口变短,延迟似乎特别糟糕——当我将窗口持续时间设置为 10 秒时,延迟时间为 3 分钟以上,当窗口持续时间设置为 60 秒时,延迟时间约为 2 分钟。
在最短的窗口时间下,我还看到记录被“捆绑”起来,因此当接收器接收到记录时,我一次收到几个窗口的记录。
在重复的聚合记录中,我确实将输出模式设置为完成,但我的理解是,如果触发器在其中多次触发,则记录应该只在当前窗口中重复,而我的不应该这样做。
我设置了与窗口时间和 10%(1 或 6 秒)的水印阈值相匹配的处理触发器,并且我知道如果我移除翻转窗口,流本身可以正常工作。
我明白为什么 spark 可能无法触发特定频率的触发器,但我认为 10 秒,当然 60 秒足以处理我正在测试的非常有限的数据量。
使用 60 秒翻转窗口和处理时间触发器发送数据的示例
- 发送 6 个有效载荷
- 等一下
- 发送 1 个有效载荷
- 稍等片刻
- 发送 3 个有效载荷
(CreateTime 来自带有 --property print.timestamp=true 的 kafka-console-consumer)。这些在我期望触发器根据 CreateTime 时间戳和窗口触发后几分钟到达。
我有时确实会看到如下消息,但没有其他 WARN 或 ERROR 级别的消息表明存在问题:
应用数据/代码
示例数据如下所示,由 ts 设置为当前时间的 python 脚本生成:
应用程序代码(嵌入 spark.master=local[*] 运行)