问题标签 [spark-structured-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 如何使用 Java 中的结构化流从 Kafka 反序列化记录?
我使用 Spark 2.1。
我正在尝试使用 Spark Structured Streaming 从 Kafka 读取记录,对它们进行反序列化并在之后应用聚合。
我有以下代码:
我想要的是将该value
字段反序列化到我的对象中,而不是强制转换为String
.
我有一个自定义的反序列化器。
我怎样才能在 Java 中做到这一点?
我发现的唯一相关链接是这个https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2 .html,但这是针对 Scala 的。
apache-spark - 如何使用 Spark Structured Streaming 打印 Json 编码的消息
我有一个DataSet[Row]
每行都是 JSON 字符串的地方。我只想打印 JSON 流或计算每批的 JSON 流。
到目前为止,这是我的代码
我的问题真的是我需要做什么才能使用结构化流打印从 Kafka 接收的数据?Kafka 中的消息是 JSON 编码的字符串,因此我将 JSON 编码的字符串转换为某种结构,并最终转换为数据集。我正在使用 Spark 2.1.0
apache-spark - 结构化流如何执行单独的流查询(并行或顺序)?
我正在编写一个测试应用程序,它使用来自 Kafka 的 topcis 的消息,然后将数据推送到 S3 和 RDBMS 表中(流程类似于此处介绍的:https ://databricks.com/blog/2017/04/26/processing-data -in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html)。所以我从 Kafka 读取数据,然后:
- 每条消息都想保存到 S3
- 一些消息保存到外部数据库中的表 A(基于过滤条件)
- 一些其他消息保存到外部数据库中的表 B(其他过滤条件)
所以我有这样的:
(请注意,我正在阅读多个 Kafka 主题)。接下来我定义所需的数据集:
现在为每个数据集创建查询以开始处理:
现在我想知道:
这些查询是否会并行执行(或按 FIFO 顺序一个接一个地执行,我应该将这些查询分配给单独的调度程序池)?
apache-spark - 使用 Spark 结构化流的实时数据标准化/规范化
在实施机器学习算法时,标准化/规范化数据是必不可少的,如果不是关键的话。使用 Spark 结构化流以实时方式执行此操作一直是我过去几周一直试图解决的问题。
事实证明,使用历史数据的StandardScaler
估计器((value(i)-mean) /standard deviation)
非常好,在我的用例中,获得合理的聚类结果是最好的,但我不确定如何StandardScaler
使用实时数据拟合模型。结构化流不允许它。任何建议将不胜感激!
换句话说,如何在 Spark 结构化流中拟合模型?
apache-spark - 一个主题中多个分区的 Spark Structured Streaming
我们如何在 Spark 结构化流中为多个分区构建 JSON。我在此处粘贴的以下示例仅用于一个分区。感谢你的帮助。
scala - 无界表是火花结构化流
我开始学习 Spark,并且很难理解 Spark 中结构化流背后的合理性。结构化流将所有到达的数据视为无界输入表,其中数据流中的每个新项目都被视为表中的新行。我有以下代码可以将传入的文件读入csvFolder
.
如果我将一个 1GB 的文件转储到该文件夹会发生什么。根据规范,流式作业每隔几毫秒触发一次。如果 Spark 在下一瞬间遇到这么大的文件,在尝试加载文件时会不会内存不足。还是自动批处理?如果是,这个批处理参数是否可配置?
apache-spark - 从 spark-shell (pyspark) 查询 spark 流应用程序
我在控制台中遵循这个例子pyspark
,一切都很完美。
之后,我将其编写为 PySpark 应用程序,如下所示:
该应用程序执行如下:
现在,如果我在另一个终端中打开一个新的火花驱动器:
并尝试运行:
它失败了:
我不明白发生了什么。
apache-spark - Spark:在流式查询中使用事件时间滑动窗口时出现问题
我正在 Spark 2.2 中进行实时数据流传输。根据我的问题陈述,我想在 120 秒的滑动窗口内查询数据。期间。我streamingquery
每 1 秒触发一次。因此,理想情况下,查询应该只运行前 120 秒。数据(更新/新数据)。
但是当我运行查询时,它运行在 120 秒之前的整个数据上。(旧)数据。这意味着窗口在已经处理的数据上滑动。
这背后的原因可能是什么?以及如何仅将窗口应用于新数据(未处理)?
apache-spark - 结构化流 + Kafka 集成 - SSL 和 Kerberos 集成?
我正在尝试从常规流媒体切换到结构化流媒体是否值得。Spark 2.1.x 结构化流是否支持从受保护的 Kafka (SSL_SASL) 消费。意思是 Kerberos 和 SLL。