0

我使用 Spark 1.6 并在两个数据帧上进行内部连接,如下所示:

val filtergroup = metric
  .join(filtercndtns, Seq("aggrgn_filter_group_id"), inner)
  .distinct()

aggrgn_filter_group_id但我不断在列中得到重复的值。你能建议一些解决方案吗?

4

1 回答 1

0

火花 < 2.0

考虑distinct在具有要删除重复的列的数据集上,然后在列上进行内部连接。

// don't use distinct yet
val filtergroup = metric
  .join(filtercndtns, Seq("aggrgn_filter_group_id"), "inner")

// take unique aggrgn_filter_group_ids
val uniqueFilterGroups = filtergroup
  .select("aggrgn_filter_group_id")
  .distinct

// Inner join to remove duplicates from the source dataset
filtergroup.join(uniqueFilterGroups, Seq("aggrgn_filter_group_id"), "inner")

select代价是使用distinctand执行额外的操作join,但应该会给您预期的结果。

火花 >= 2.0

以下解决方案仅适用于支持运算符的Spark 2.0+dropDuplicates ,并允许仅考虑列的子集删除重复项。

引用文档

distinct(): Dataset[T] 返回一个新的 Dataset,它只包含来自这个 Dataset 的唯一行。这是 的别名dropDuplicates

distinct或者dropDuplicates只是删除比较每一列的行重复项。

如果您对特定列感兴趣,则应使用其中之一dropDuplicates,例如

val filtergroup = metric
  .join(filtercndtns, Seq("aggrgn_filter_group_id"), "inner")
  .dropDuplicates("aggrgn_filter_group_id")

当您指定一列或一组列时,dropDuplicates返回一个删除重复行的新数据集,仅考虑列的子集。

于 2018-07-08T17:42:44.433 回答