问题标签 [spark-structured-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 为什么转换在结构化流中只做一次副作用(println)?
为什么select
每批都打印该语句,但hello world
只打印一次?
scala - 如何将 Spark 结构化流与 Kafka Direct Stream 结合使用?
我遇到了带 Spark 的结构化流,它有一个从 S3 存储桶持续消费并将处理后的结果写入 MySQL 数据库的示例。
这如何与Spark Kafka Streaming一起使用?
有没有办法在不使用的情况下结合这两个例子stream.foreachRDD(rdd => {})
?
java - Apache Spark 会话:IOException:(路径)的 mkdir 失败
我正在测试新版本的 Apache Spark 2.0,尝试利用结构化流功能,使用非常简单的代码创建带有流数据的数据集,然后打印创建的数据集。这是我的代码:
问题是我得到一个 IOException: mkdir of (temporary directory) failed。有人可以帮我解决这个问题吗?太感谢了。
这是显示的完整错误:
apache-spark - 将流数据集附加到 Spark 中的批处理数据集
我们在 Spark 中有一个用例,我们希望将历史数据从数据库加载到 Spark,并继续向 Spark 添加新的流数据,然后我们可以对整个最新数据集进行分析。
据我所知,Spark SQL 和 Spark Streaming 都不能将历史数据与流数据结合起来。然后我发现了 Spark 2.0 中的 Structured Streaming,它似乎是为这个问题而构建的。但是经过一些实验,我仍然无法弄清楚。这是我的代码:
我收到错误“org.apache.spark.sql.AnalysisException:不支持流和批处理 DataFrames/Datasets 之间的联合;” 当我联合()两个数据集时。
有人可以帮我吗?我会走错方向吗?
scala - Spark Structured Streaming MemoryStream 报告 用于自定义接收器时未选择数据
我正在尝试编写简单的测试用例来使用火花结构流。代码的灵感来自 github 上的holdenk。
这是 CustomSink 代码
我尝试使用 MemoryStream 在测试用例中运行它
无线报错input.addData("init")
init
如果添加线路,则不会到达接收器input.addData("init")
如果我取消注释行,测试用例可以成功运行而不会报告错误input.addData("init")
。
但是值init
没有到达接收器。仅hi hi
显示值。
为什么以及如何解决它?
postgresql - sSpark 结构化流 PostgreSQL updatestatebykey
如何通过由INPUT PostgreSQL 表中的更改触发的 Spark 结构化流计算来更新OUTPUT TABLE的状态?
作为现实生活场景中的 USERS 表已更新user_id = 0002
,如何仅为该用户触发 Spark 计算并将结果写入/更新到另一个表?
apache-spark - Spark - 使用 Firehose 从分区文件夹中读取 JSON
Kinesis firehose 管理文件的持久性,在本例中为时间序列 JSON,到按 YYYY/MM/DD/HH 分区的文件夹层次结构中(以 24 编号为小时)......很棒。
那么如何使用 Spark 2.0 读取这些嵌套的子文件夹并从所有叶子 json 文件创建一个静态数据框?数据框阅读器是否有“选项”?
我的下一个目标是让它成为一个流式 DF,其中由 Firehose 持久化到 s3 中的新文件使用 Spark 2.0 中的新结构化流自然成为流式数据帧的一部分。我知道这都是实验性的——希望有人以前使用过 S3 作为流文件源,其中数据被划分到如上所述的文件夹中。当然更喜欢直接的 Kinesis 流,但是这个连接器上没有 2.0 的日期,所以 Firehose->S3 是临时的。
ND:我正在使用 databricks,它将 S3 挂载到 DBFS 中,但当然也很容易成为 EMR 或其他 Spark 提供程序。如果一个可以共享的笔记本也很高兴看到一个例子。
干杯!
xml - 火花流 xml 文件
我需要处理流入 S3 文件夹的 xml 文件。目前,我已将其实现如下。
一、使用Spark的fileStream读取文件
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
对于每个 RDD,检查是否有任何文件被读取
将字符串写入新的 HDFS 目录
创建从上述 HDFS 目录读取的 Dataframe
对 Dataframe 进行一些处理并保存为 JSON
不知何故,我觉得上述方法非常低效,坦率地说相当学校孩子气。有更好的解决方案吗?任何帮助将不胜感激。
后续问题:如何操作数据框中的字段(而不是列)?我有一个非常复杂的嵌套 xml,当我使用上述方法时,我得到一个包含 9 列和 50 个奇数内部结构数组的数据框。这很好,除了需要修剪某些字段名称。有没有办法在不爆炸数据框的情况下实现这一点,因为我需要再次构建相同的结构?
scala - 监控结构化流
我有一个运行良好的结构化流设置,但我希望在它运行时对其进行监控。
我已经建立了一个 EventCollector
我已经构建了一个 EventCollector 并将侦听器添加到我的 spark 会话中
然后我启动查询
但是,onQueryProgress 永远不会受到打击。onQueryStarted 确实如此,但我希望以一定的时间间隔获得查询的进度,以监控查询的执行情况。有人可以帮忙吗?