问题标签 [spark-structured-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
8606 浏览

scala - 如何将 from_json 与 Kafka connect 0.10 和 Spark Structured Streaming 一起使用?

我试图从 [Databricks][1] 重现该示例并将其应用于 Kafka 的新连接器并触发结构化流,但是我无法使用 Spark 中的开箱即用方法正确解析 JSON ...

注意:主题以 JSON 格式写入 Kafka。

下面的代码行不通,相信是因为json列是字符串,与from_json签名的方法不匹配...

有小费吗?

[更新]示例工作: https ://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala

0 投票
2 回答
2885 浏览

apache-spark - 结构化流将 Parquet 文件写入 Hadoop

我能够将结构化流式传输的结果写入 Parquet 文件。问题是这些文件在本地文件系统中,现在我想将它们写入 Hadoop 文件系统。有没有办法做到这一点?

我使用了这段代码,但它说:

谢谢

0 投票
2 回答
5449 浏览

scala - 带有本地 CSV 文件的 Spark 2.1.0 结构流式传输

只是为了学习新的 Spark 结构数据流,我尝试过这样的实验,但不确定我是否对流功能做错了什么。

首先,我从静态开始,只使用 Spark 2.1.0 附带的简单文本 (csv) 文件:

我可以得到如此合理的输出(在 Zepplin 下)。

并按照示例,我只是修改了代码以读取相同的文件并提供了架构

并且没有错误消息,所以我想将数据写入内存并使用以下代码查看结果:

但是,没有错误消息,我一直得到“空输出”

这些代码在 Zeppelin 0.7 下进行了测试,我不确定我是否在这里遗漏了什么。同时,我尝试了来自 Apache Spark 2.1.0 官方网站的示例,$nc -lk 9999它运行得非常好。

如果我做错了什么,我可以学习吗?

[修改和测试]

  1. 我尝试将相同的文件 people.txt 复制到一个 .../csv/ 文件夹下的 people1.csv peopele2.csv people3.csv
  2. val csvDF = spark.readStream.schema(userSchema).csv("/somewhere/csv")
  3. csvDF.groupBy("name").count().writeStream.outputMode("complete").format("console").start().awaitTermination()

我得到了这个:

因此,我可能认为这不是数据readstream()问题...

0 投票
1 回答
2067 浏览

scala - 使用 foreach 附加到文本文件的 Spark 结构化流

我想使用结构化流向文本文件附加行。此代码导致SparkException: Task not serializable. 我认为toDF是不允许的。我怎样才能让这段代码工作?

0 投票
1 回答
1592 浏览

python - 如何在 PySpark 2.1.0 中的事件时间窗口上定义 UDAF

问题定义

我正在编写一个 Python 应用程序,它在一系列值上滑动一个窗口,每个值都有一个时间戳。我想对滑动窗口中的值应用一个函数,以便从 N 个最新值中计算分数,如图所示。我们已经使用 Python 库实现了该功能以利用 GPU。

我发现 Apache Spark 2.0 附带结构化流,它支持事件时间的窗口操作。如果您想从 .csv 文件中读取有限的记录序列,并希望在这样的滑动窗口中计算记录,您可以在 PySpark 中使用以下代码:

但是,我想在滑动窗口上应用预定义聚合函数以外的 UDAF。根据https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg,可用的聚合函数只有 avg、max、min、求和,并计数。

还不支持?如果是这样,PySpark 什么时候支持它?

https://stackoverflow.com/a/32750733/1564381表明可以在 Java 或 Scala 中定义 UserDefinedAggregateFunction,然后在 PySpark 中调用它。看起来很有趣,但我想将我自己的 Python 函数应用于滑动窗口中的值。我想要一种纯粹的 Pythonic 方式。

ps 让我知道 Python 中除 PySpark 之外的任何可以解决此类问题的框架(在流上滑动的窗口上应用 UDAF)。

0 投票
3 回答
3324 浏览

apache-spark - 如何将流数据集写入 Kafka?

我正在尝试对主题数据进行一些丰富。因此,使用 Spark 结构化流从 Kafka 接收器读取回 Kafka。

但我得到一个例外:

任何解决方法?

0 投票
1 回答
495 浏览

apache-spark - SparkSql - Join Query execution throws 'object is not an instance of declaring class'

I am executing query on SparkSession which throws Object is not an instance of declaring class, below is the code after which

The exception is during method count()

I have also observed if the query is simple select col from table1, that runs fine but join query above causes error.

I am using Spark 2.1 and to create SparkSession I do below

More stack trace below :-

0 投票
1 回答
744 浏览

apache-spark - 未调用结构化流 2.1.0 Foreach

我正在尝试在纱线集群上使用 Kafka 测试结构化流的 foreach 并拥有这段小代码:

当我运行这个程序时,没有调用 open()。我已经比较了其他线程的其他示例,但我无法找出缺少的内容。在 foreach 之后有一个 start() 调用,这似乎与此处的文档相匹配。

我可以流式传输到文件或控制台,但无法在 foreach 中获得任何调用。

关于在哪里看的任何建议?TIA。

0 投票
1 回答
1701 浏览

apache-spark - 如何使用 from_json 允许消息具有不同的字段?

我正在尝试使用 Spark Structured Streaming 处理来自 Kafka 的数据。提取数据的代码如下:

ds是一个 DataFrame,其中包含从 Kafka 消耗的数据。

当我尝试读取 JSON 以进行更快的查询时,问题就来了。来自的功能org.apache.spark.sql.functions from_json()要求强制提供模式。如果消息有一些不同的字段怎么办?

0 投票
2 回答
1403 浏览

apache-spark - 如何编写数据集进入关于 Spark Structured Streaming - Java8 的 kafka 输出主题

我正在尝试ForeachWriter在 Spark 2.1 中使用接口它的接口,但我不能使用它。