0

我想在 Data Fusion 中对我的数据实现 rank 或分区 row_num 函数,但我没有找到任何插件可以这样做。

有没有办法做到这一点?

我想实现以下,

在此处输入图像描述

假设我有上面的数据,现在我想根据 AccountNumber 对数据进行分组,并将最近的记录发送到一个接收器并休息到其他接收器。所以从上面的数据来看,

Sink1 预计有,

在此处输入图像描述

水槽 2 ,

在此处输入图像描述

我计划通过按 AccountNumber 应用 rank 或 row_number 分区并按 Record_date desc 之类的功能排序并将具有 rank=1 或 row_num=1 的记录发送到一个接收器并休息到另一个接收器来进行这种隔离。

4

1 回答 1

1

解决问题的一个好方法是使用Spark 插件。要将其添加到您的 Datafusion 实例,请转到HUB -> 插件 -> 搜索 Spark -> 部署插件。然后您可以在分析选项卡上找到它。

为了给你一个如何使用它的例子,我创建了下面的管道:

在此处输入图像描述

这条管道基本上是:

  1. 从 GCS 读取文件。
  2. 在您的数据中执行排名函数
  3. 过滤不同分支中rank=1且rank>1的数据
  4. 将您的数据保存在不同的位置

现在让我们更深入地了解每个组件:

1 - GCS:这是一个简单的 GCS 源。此示例使用的文件具有如下所示的数据

在此处输入图像描述

2 - Spark_rank:这是一个 Spark 插件,代码如下。该代码基本上使用您的数据创建了一个临时视图,并且它们应用查询来对您的行进行排名。之后,您的数据将返回到管道。您还可以在下面看到此步骤的输入和输出数据。请注意输出是重复的,因为它被传递到两个分支。

      def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
          df.createTempView("source")
          df.sparkSession.sql("SELECT AccountNumber, Address, Record_date, RANK() OVER (PARTITION BY accountNumber ORDER BY record_date DESC) as rank FROM source")
    }

在此处输入图像描述

3 - Spark2Spark3:与下面的步骤一样,此步骤使用 Spark 插件来转换数据。Spark2 使用下面的代码仅获取 rank = 1 的数据

    def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
      df.createTempView("source_0")
      df.sparkSession.sql("SELECT AccountNumber, Address, Record_date FROM 
    source_0 WHERE rank = 1")
    }

Spark3 使用以下代码获取 rank > 1 的数据:

    def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
      df.createTempView("source_1")
      df.sparkSession.sql("SELECT accountNumber, address, record_date FROM source_1 WHERE rank > 1")
    }

4 - GCS2GCS3:最后,在这一步中,您的数据会再次保存到 GCS 中。

于 2020-09-23T16:52:50.657 回答