问题标签 [spark-structured-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1302 浏览

apache-spark - 在 Spark Streaming (Spark 2.0) 中使用 Kafka

0 投票
1 回答
322 浏览

scala - 为什么转换在结构化流中只做一次副作用(println)?

为什么select每批都打印该语句,但hello world只打印一次?

0 投票
2 回答
4408 浏览

scala - 如何将 Spark 结构化流与 Kafka Direct Stream 结合使用?

我遇到了带 Spark 的结构化流,它有一个从 S3 存储桶持续消费并将处理后的结果写入 MySQL 数据库的示例。

这如何与Spark Kafka Streaming一起使用?

有没有办法在不使用的情况下结合这两个例子stream.foreachRDD(rdd => {})

0 投票
3 回答
1349 浏览

java - Apache Spark 会话:IOException:(路径)的 mkdir 失败

我正在测试新版本的 Apache Spark 2.0,尝试利用结构化流功能,使用非常简单的代码创建带有流数据的数据集,然后打印创建的数据集。这是我的代码:

问题是我得到一个 IOException: mkdir of (temporary directory) failed。有人可以帮我解决这个问题吗?太感谢了。

这是显示的完整错误:

0 投票
1 回答
1563 浏览

apache-spark - 将流数据集附加到 Spark 中的批处理数据集

我们在 Spark 中有一个用例,我们希望将历史数据从数据库加载到 Spark,并继续向 Spark 添加新的流数据,然后我们可以对整个最新数据集进行分析。

据我所知,Spark SQL 和 Spark Streaming 都不能将历史数据与流数据结合起来。然后我发现了 Spark 2.0 中的 Structured Streaming,它似乎是为这个问题而构建的。但是经过一些实验,我仍然无法弄清楚。这是我的代码:

我收到错误“org.apache.spark.sql.AnalysisException:不支持流和批处理 DataFrames/Datasets 之间的联合;” 当我联合()两个数据集时。

有人可以帮我吗?我会走错方向吗?

0 投票
1 回答
923 浏览

scala - Spark Structured Streaming MemoryStream 报告 用于自定义接收器时未选择数据

我正在尝试编写简单的测试用例来使用火花结构流。代码的灵感来自 github 上的holdenk

这是 CustomSink 代码

我尝试使用 MemoryStream 在测试用例中运行它

无线报错input.addData("init")

init如果添加线路,则不会到达接收器input.addData("init")

如果我取消注释行,测试用例可以成功运行而不会报告错误input.addData("init")

但是值init没有到达接收器。仅hi hi显示值。

为什么以及如何解决它?

0 投票
1 回答
497 浏览

postgresql - sSpark 结构化流 PostgreSQL updatestatebykey

如何通过由INPUT PostgreSQL 表中的更改触发的 Spark 结构化流计算来更新OUTPUT TABLE的状态?

作为现实生活场景中的 USERS 表已更新user_id = 0002,如何仅为该用户触发 Spark 计算并将结果写入/更新到另一个表?

0 投票
2 回答
5198 浏览

apache-spark - Spark - 使用 Firehose 从分区文件夹中读取 JSON

Kinesis firehose 管理文件的持久性,在本例中为时间序列 JSON,到按 YYYY/MM/DD/HH 分区的文件夹层次结构中(以 24 编号为小时)......很棒。

那么如何使用 Spark 2.0 读取这些嵌套的子文件夹并从所有叶子 json 文件创建一个静态数据框?数据框阅读器是否有“选项”?

我的下一个目标是让它成为一个流式 DF,其中由 Firehose 持久化到 s3 中的新文件使用 Spark 2.0 中的新结构化流自然成为流式数据帧的一部分。我知道这都是实验性的——希望有人以前使用过 S3 作为流文件源,其中数据被划分到如上所述的文件夹中。当然更喜欢直接的 Kinesis 流,但是这个连接器上没有 2.0 的日期,所以 Firehose->S3 是临时的。

ND:我正在使用 databricks,它将 S3 挂载到 DBFS 中,但当然也很容易成为 EMR 或其他 Spark 提供程序。如果一个可以共享的笔记本也很高兴看到一个例子。

干杯!

0 投票
1 回答
2078 浏览

xml - 火花流 xml 文件

我需要处理流入 S3 文件夹的 xml 文件。目前,我已将其实现如下。

一、使用Spark的fileStream读取文件

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

对于每个 RDD,检查是否有任何文件被读取

将字符串写入新的 HDFS 目录

创建从上述 HDFS 目录读取的 Dataframe

对 Dataframe 进行一些处理并保存为 JSON

不知何故,我觉得上述方法非常低效,坦率地说相当学校孩子气。有更好的解决方案吗?任何帮助将不胜感激。

后续问题:如何操作数据框中的字段(而不是列)?我有一个非常复杂的嵌套 xml,当我使用上述方法时,我得到一个包含 9 列和 50 个奇数内部结构数组的数据框。这很好,除了需要修剪某些字段名称。有没有办法在不爆炸数据框的情况下实现这一点,因为我需要再次构建相同的结构?

0 投票
1 回答
5812 浏览

scala - 监控结构化流

我有一个运行良好的结构化流设置,但我希望在它运行时对其进行监控。

我已经建立了一个 EventCollector

我已经构建了一个 EventCollector 并将侦听器添加到我的 spark 会话中

然后我启动查询

但是,onQueryProgress 永远不会受到打击。onQueryStarted 确实如此,但我希望以一定的时间间隔获得查询的进度,以监控查询的执行情况。有人可以帮忙吗?