解决问题的一个好方法是使用Spark
插件。要将其添加到您的 Datafusion 实例,请转到HUB -> 插件 -> 搜索 Spark -> 部署插件。然后您可以在分析选项卡上找到它。
为了给你一个如何使用它的例子,我创建了下面的管道:
这条管道基本上是:
- 从 GCS 读取文件。
- 在您的数据中执行排名函数
- 过滤不同分支中rank=1且rank>1的数据
- 将您的数据保存在不同的位置
现在让我们更深入地了解每个组件:
1 - GCS:这是一个简单的 GCS 源。此示例使用的文件具有如下所示的数据
2 - Spark_rank:这是一个 Spark 插件,代码如下。该代码基本上使用您的数据创建了一个临时视图,并且它们应用查询来对您的行进行排名。之后,您的数据将返回到管道。您还可以在下面看到此步骤的输入和输出数据。请注意输出是重复的,因为它被传递到两个分支。
def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
df.createTempView("source")
df.sparkSession.sql("SELECT AccountNumber, Address, Record_date, RANK() OVER (PARTITION BY accountNumber ORDER BY record_date DESC) as rank FROM source")
}
3 - Spark2和Spark3:与下面的步骤一样,此步骤使用 Spark 插件来转换数据。Spark2 使用下面的代码仅获取 rank = 1 的数据
def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
df.createTempView("source_0")
df.sparkSession.sql("SELECT AccountNumber, Address, Record_date FROM
source_0 WHERE rank = 1")
}
Spark3 使用以下代码获取 rank > 1 的数据:
def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
df.createTempView("source_1")
df.sparkSession.sql("SELECT accountNumber, address, record_date FROM source_1 WHERE rank > 1")
}
4 - GCS2和GCS3:最后,在这一步中,您的数据会再次保存到 GCS 中。