0

我已经确定我的挂起工作确实在其连接上存在偏差

我可以使用哪些技术来使我的工作仍然成功?

我的代码如下所示:

from transforms.api import Input, Output, transform


@transform(
  my_output=Output("/path/to/my/output"),
  left_input=Input("/path/to/my/left_input"),
  right_input=Input("/path/to/my/right_input"),
)
def my_compute_function(my_output, left_input, right_input):
  left_df = left_input.dataframe()
  right_df = right_input.dataframe()

  output_df = left_df.join(right_df, on=["my_joint_column"])
  my_output.write_dataframe(output_df)

我可以看到一项任务特别需要很长时间:

慢任务

4

1 回答 1

1

您有几个选择,具体取决于您的密钥分配的正确性。

您必须验证的第一件事是:

密钥的分配实际上是否正确?即每个键的重复行实际上是否有效并且需要对其进行操作?

空值或其他此类无效键出现在您的数据中是很常见的,值得验证这些是否需要过滤掉,或仅通过选择最新版本进行合并(这通常称为最大行或最小行操作,即对于每个键,选择在其他列上具有最大值的键,例如时间戳列)。

假设当前的密钥实际上是有效的并且需要被操作,你接下来必须问:

连接的一侧是否明显小于另一侧?

这通常意味着连接的右侧的键数是左侧的 1/10。如果这是真的,您可以尝试Salting the Join。值得注意的是,大小差异不是数据集中总行数的函数(尽管这可能是一种快速而肮脏的估计方法),而是应该将其视为键之间的计数差异加入,你是否应该加入。您可以使用此处的技术获取每个键的计数,并且可以通过除以 df1_COUNTdf2_COUNT不是相乘来轻松计算比例差异。

如果连接的右侧不明显小于左侧,则:

您有一个大型联接,两侧的行数相似。您必须增加 Executor 内存以允许行适合内存

这意味着您必须将配置文件应用于您的转换,将 Executor 内存增加到其当前值以上(可以在注明 AQE 的同一页面上找到

于 2022-02-23T18:31:33.233 回答