1

我是 Rapids 的新手,无法理解支持的操作。

我有以下格式的数据:

+------------+----------+
|        kmer|source_seq|
+------------+----------+
|TGTCGGTTTAA$|         4|
|ACCACCACCAC$|         8|
|GCATAATTTCC$|         1|
|CCGTCAAAGCG$|         7|
|CCGTCCCGTGG$|         6|
|GCGCTGTTATG$|         2|
|GAGCATAGGTG$|         5|
|CGGCGGATTCT$|         0|
|GGCGCGAGGGT$|         3|
|CCACCACCAC$A|         8|
|CACCACCAC$AA|         8|
|CCCAAAAAAAAA|         0|
|AAGAAAAAAAAA|         5|
|AAGAAAAAAAAA|         0|
|TGTAAAAAAAAA|         0|
|CCACAAAAAAAA|         8|
|AGACAAAAAAAA|         7|
|CCCCAAAAAAAA|         0|
|CAAGAAAAAAAA|         5|
|TAAGAAAAAAAA|         0|
+------------+----------+

我正在尝试使用以下代码找出哪些“kmer”具有哪些“source_seq”:

val w = Window.partitionBy("kmer")
x.withColumn("source_seqs", collect_list("source_seq").over(w))

// Result is something like this:
+------------+----------+-----------+                                           
|        kmer|source_seq|source_seqs|
+------------+----------+-----------+
|AAAACAAGACCA|         2|        [2]|
|AAAACAAGCAGC|         4|        [4]|
|AAAACCACGAGC|         3|        [3]|
|AAAACCGCCAAA|         7|        [7]|
|AAAACCGGTGTG|         1|        [1]|
|AAAACCTATATC|         5|        [5]|
|AAAACGACTTCT|         6|        [6]|
|AAAACGCGCAAG|         3|        [3]|
|AAAAGGCCTATT|         7|        [7]|
|AAAAGGCGTTCG|         3|        [3]|
|AAAAGGCTGTGA|         1|        [1]|
|AAAAGGTCTACC|         2|        [2]|
|AAAAGTCGAGCA|         7|     [7, 0]|
|AAAAGTCGAGCA|         0|     [7, 0]|
|AAAATCCGATCA|         0|        [0]|
|AAAATCGAGCGG|         0|        [0]|
|AAAATCGTTGAA|         7|        [7]|
|AAAATGGACAAG|         1|        [1]|
|AAAATTGCACCA|         3|        [3]|
|AAACACCGCCGT|         3|        [3]|
+------------+----------+-----------+

Spark Rapids 支持的操作符文档提到collect_list仅受窗口支持,据我所知,这是我在代码中所做的。

但是,查看查询计划,很容易看出collect_list不是由 GPU 执行的:

scala> x.withColumn("source_seqs", collect_list("source_seq").over(w)).explain
== Physical Plan ==
Window [collect_list(source_seq#302L, 0, 0) windowspecdefinition(kmer#301, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS max_source#658], [kmer#301]
+- GpuColumnarToRow false
   +- GpuSort [kmer#301 ASC NULLS FIRST], false, RequireSingleBatch, 0
      +- GpuCoalesceBatches RequireSingleBatch
         +- GpuShuffleCoalesce 2147483647
            +- GpuColumnarExchange gpuhashpartitioning(kmer#301, 200), ENSURE_REQUIREMENTS, [id=#1496]
               +- GpuFileGpuScan csv [kmer#301,source_seq#302L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/cloud-user/phase1/example/1620833755/part-00000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<kmer:string,source_seq:bigint>

与具有不同功能的类似查询不同,我们可以看到使用 GPU 执行的窗口:

scala> x.withColumn("min_source", min("source_seq").over(w)).explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuWindow [gpumin(source_seq#302L) gpuwindowspecdefinition(kmer#301, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(unboundedfollowing$()))) AS max_source#648L], [kmer#301], false
   +- GpuSort [kmer#301 ASC NULLS FIRST], false, RequireSingleBatch, 0
      +- GpuCoalesceBatches RequireSingleBatch
         +- GpuShuffleCoalesce 2147483647
            +- GpuColumnarExchange gpuhashpartitioning(kmer#301, 200), ENSURE_REQUIREMENTS, [id=#1431]
               +- GpuFileGpuScan csv [kmer#301,source_seq#302L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/cloud-user/phase1/example/1620833755/part-00000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<kmer:string,source_seq:bigint>

我是否以某种方式错误地理解了支持的操作文档,或者我是否以错误的方式编写了代码?对此的任何帮助将不胜感激。

4

3 回答 3

2

是的,正如 Mithun 提到的,从 0.5 版本开始, spark.rapids.sql.expression.CollectList 开始为真。但是在 0.4 版本中它是错误的: https ://github.com/NVIDIA/spark-rapids/blob/branch-0.4/docs/configs.md

这是我在 0.5+ 版本上测试的计划:

val w = Window.partitionBy("name")
val resultdf=dfread.withColumn("values", collect_list("value").over(w))
resultdf.explain

== Physical Plan ==
GpuColumnarToRow false
+- GpuWindow [collect_list(value#134L, 0, 0) gpuwindowspecdefinition(name#133, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(unboundedfollowing$()))) AS values#138], [name#133], false
   +- GpuCoalesceBatches RequireSingleBatch
      +- GpuSort [name#133 ASC NULLS FIRST], false, com.nvidia.spark.rapids.OutOfCoreSort$@28e73bd1
         +- GpuShuffleCoalesce 2147483647
            +- GpuColumnarExchange gpuhashpartitioning(name#133, 200), ENSURE_REQUIREMENTS, [id=#563]
               +- GpuFileGpuScan csv [name#133,value#134L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/tmp/df], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,value:bigint>
于 2021-05-20T17:45:47.187 回答
2

肯尼。请问rapids-4-spark您使用的是什么版本的插件,以及 Spark 的版本?

默认情况下,GPU 的初始实现COLLECT_LIST()被禁用,因为它的行为与 Spark 的 wrt null 值不匹配。(GPU 版本在聚合数组行中保留空值,而 Spark 将它们删除。)编辑:在 0.5 版本中更正了该行为。

如果您的聚合列中没有空值(并且使用的是rapids-4-spark0.4),您可以尝试通过设置来启用运算符spark.rapids.sql.expression.CollectList=true

一般来说,可以通过设置来检查算子没有在 GPU 上运行的原因spark.rapids.sql.explain=NOT_ON_GPU。那应该将原因打印到控制台。

如果您在使用该插件时仍然遇到困难或不正确的行为rapids-4-spark,请随时在项目的 GitHub上提出错误。我们很乐意进一步调查。

于 2021-05-20T17:37:26.833 回答
0

即将发布的 21.08 版本将支持用于聚合和窗口化的 collect_set(RAPIDS Spark 正在转向日历版本控制)。

于 2021-05-27T05:11:40.440 回答