2

问题定义

我正在编写一个 Python 应用程序,它在一系列值上滑动一个窗口,每个值都有一个时间戳。我想对滑动窗口中的值应用一个函数,以便从 N 个最新值中计算分数,如图所示。我们已经使用 Python 库实现了该功能以利用 GPU。

我发现 Apache Spark 2.0 附带结构化流,它支持事件时间的窗口操作。如果您想从 .csv 文件中读取有限的记录序列,并希望在这样的滑动窗口中计算记录,您可以在 PySpark 中使用以下代码:

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .getOrCreate()

schema = StructType() \
    .add('ts', 'timestamp') \
    .add('value', 'double') \

sqlContext = SQLContext(spark)
lines = sqlContext \
    .readStream \
    .format('csv') \
    .schema(schema) \
    .load(path='file:///'+getcwd()+'/csv')

windowedCount = lines.groupBy(
    window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'}) 

query = windowedCount \
   .writeStream \
    .outputMode('complete') \
    .format('console') \
    .start()

query.awaitTermination()

但是,我想在滑动窗口上应用预定义聚合函数以外的 UDAF。根据https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg,可用的聚合函数只有 avg、max、min、求和,并计数。

还不支持?如果是这样,PySpark 什么时候支持它?

https://stackoverflow.com/a/32750733/1564381表明可以在 Java 或 Scala 中定义 UserDefinedAggregateFunction,然后在 PySpark 中调用它。看起来很有趣,但我想将我自己的 Python 函数应用于滑动窗口中的值。我想要一种纯粹的 Pythonic 方式。

ps 让我知道 Python 中除 PySpark 之外的任何可以解决此类问题的框架(在流上滑动的窗口上应用 UDAF)。

4

1 回答 1

1

在 Spark <2.3 中,您不能这样做。

对于 Spark >= 2.3,这对于分组数据是可能的,但对于使用“PySpark UDAFs with Pandas”的 Windows 尚不可行。

目前,PySpark 无法在 Windows 上运行 UserDefined 函数。

这是一个很好描述的 SO 问题:Applying UDFs on GroupedData in PySpark (with running python example)

这是添加此功能的 JIRA 票证 - https://issues.apache.org/jira/browse/SPARK-10915

于 2018-03-06T19:07:47.063 回答