问题标签 [spark-structured-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 如何将 from_json 与 Kafka connect 0.10 和 Spark Structured Streaming 一起使用?
我试图从 [Databricks][1] 重现该示例并将其应用于 Kafka 的新连接器并触发结构化流,但是我无法使用 Spark 中的开箱即用方法正确解析 JSON ...
注意:主题以 JSON 格式写入 Kafka。
下面的代码行不通,相信是因为json列是字符串,与from_json签名的方法不匹配...
有小费吗?
apache-spark - 结构化流将 Parquet 文件写入 Hadoop
我能够将结构化流式传输的结果写入 Parquet 文件。问题是这些文件在本地文件系统中,现在我想将它们写入 Hadoop 文件系统。有没有办法做到这一点?
我使用了这段代码,但它说:
谢谢
scala - 带有本地 CSV 文件的 Spark 2.1.0 结构流式传输
只是为了学习新的 Spark 结构数据流,我尝试过这样的实验,但不确定我是否对流功能做错了什么。
首先,我从静态开始,只使用 Spark 2.1.0 附带的简单文本 (csv) 文件:
我可以得到如此合理的输出(在 Zepplin 下)。
并按照示例,我只是修改了代码以读取相同的文件并提供了架构
并且没有错误消息,所以我想将数据写入内存并使用以下代码查看结果:
但是,没有错误消息,我一直得到“空输出”
这些代码在 Zeppelin 0.7 下进行了测试,我不确定我是否在这里遗漏了什么。同时,我尝试了来自 Apache Spark 2.1.0 官方网站的示例,$nc -lk 9999
它运行得非常好。
如果我做错了什么,我可以学习吗?
[修改和测试]
- 我尝试将相同的文件 people.txt 复制到一个 .../csv/ 文件夹下的 people1.csv peopele2.csv people3.csv
val csvDF = spark.readStream.schema(userSchema).csv("/somewhere/csv")
csvDF.groupBy("name").count().writeStream.outputMode("complete").format("console").start().awaitTermination()
我得到了这个:
因此,我可能认为这不是数据readstream()问题...
scala - 使用 foreach 附加到文本文件的 Spark 结构化流
我想使用结构化流向文本文件附加行。此代码导致SparkException: Task not serializable
. 我认为toDF
是不允许的。我怎样才能让这段代码工作?
python - 如何在 PySpark 2.1.0 中的事件时间窗口上定义 UDAF
我正在编写一个 Python 应用程序,它在一系列值上滑动一个窗口,每个值都有一个时间戳。我想对滑动窗口中的值应用一个函数,以便从 N 个最新值中计算分数,如图所示。我们已经使用 Python 库实现了该功能以利用 GPU。
我发现 Apache Spark 2.0 附带结构化流,它支持事件时间的窗口操作。如果您想从 .csv 文件中读取有限的记录序列,并希望在这样的滑动窗口中计算记录,您可以在 PySpark 中使用以下代码:
但是,我想在滑动窗口上应用预定义聚合函数以外的 UDAF。根据https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg,可用的聚合函数只有 avg、max、min、求和,并计数。
还不支持?如果是这样,PySpark 什么时候支持它?
https://stackoverflow.com/a/32750733/1564381表明可以在 Java 或 Scala 中定义 UserDefinedAggregateFunction,然后在 PySpark 中调用它。看起来很有趣,但我想将我自己的 Python 函数应用于滑动窗口中的值。我想要一种纯粹的 Pythonic 方式。
ps 让我知道 Python 中除 PySpark 之外的任何可以解决此类问题的框架(在流上滑动的窗口上应用 UDAF)。
apache-spark - 如何将流数据集写入 Kafka?
我正在尝试对主题数据进行一些丰富。因此,使用 Spark 结构化流从 Kafka 接收器读取回 Kafka。
但我得到一个例外:
任何解决方法?
apache-spark - SparkSql - Join Query execution throws 'object is not an instance of declaring class'
I am executing query on SparkSession
which throws Object is not an instance of declaring class
, below is the code after which
The exception is during method count()
I have also observed if the query is simple select col from table1
, that runs fine but join query above causes error.
I am using Spark 2.1 and to create SparkSession I do below
More stack trace below :-
apache-spark - 未调用结构化流 2.1.0 Foreach
我正在尝试在纱线集群上使用 Kafka 测试结构化流的 foreach 并拥有这段小代码:
当我运行这个程序时,没有调用 open()。我已经比较了其他线程的其他示例,但我无法找出缺少的内容。在 foreach 之后有一个 start() 调用,这似乎与此处的文档相匹配。
我可以流式传输到文件或控制台,但无法在 foreach 中获得任何调用。
关于在哪里看的任何建议?TIA。
apache-spark - 如何使用 from_json 允许消息具有不同的字段?
我正在尝试使用 Spark Structured Streaming 处理来自 Kafka 的数据。提取数据的代码如下:
ds
是一个 DataFrame,其中包含从 Kafka 消耗的数据。
当我尝试读取 JSON 以进行更快的查询时,问题就来了。来自的功能org.apache.spark.sql.functions
from_json()
要求强制提供模式。如果消息有一些不同的字段怎么办?
apache-spark - 如何编写数据集进入关于 Spark Structured Streaming - Java8 的 kafka 输出主题
我正在尝试ForeachWriter
在 Spark 2.1 中使用接口它的接口,但我不能使用它。