2

我目前正在开发一个 ETL 管道,该管道使用 BigQuery 存储暂存数据,然后使用 Dataprep 转换数据并将其存储在新的 BigQuery 表中以用于生产。

我们一直在寻找最经济有效的方法来对一小部分数据应用这些转换时遇到问题,通常只有从暂存数据表中当前最大日期开始的最后 X 天。例如,我们需要计算暂存数据中的最大可用日期,然后检索从该日期开始的过去 3 天内的所有行。不幸的是,我们不能依赖临时数据中的“最大日期”始终是最新的(这些数据是从质量和可靠性不同的第三方 API 引入的)。

起初,我尝试通过获取最大日期直接在 Dataprep 中应用这些转换,使用 DATEDIFF 创建一个比较列,然后丢弃比该“最大日期”早 3 天以上的行。事实证明,这在成本方面非常耗时且效率低下。

接下来我们尝试过滤 BigQuery 视图中的数据,然后将其用作 Dataprep 流的初始数据集(数据将在 Dataprep 应用任何转换之前进行预过滤)。我们首先尝试在 BigQuery 中动态执行此操作,如下所示:

WITH latest_partitiontime AS (SELECT _PARTITIONTIME as pt FROM 
`{project}.{dataset}.{table}`
GROUP BY _PARTITIONTIME
ORDER BY _PARTITIONTIME DESC
LIMIT 1)

SELECT {columns}
FROM `{project}.{dataset}.{table}`
WHERE _PARTITIONTIME >= (SELECT pt FROM latest_partitiontime)

但是在预览查询的 GB/估计成本时,它似乎非常低效且昂贵。

我们尝试的下一件事是对日期进行硬编码,由于某种原因,这更便宜/更快:

SELECT {columns}
FROM `{project}.{dataset}.{table}`
WHERE _PARTITIONTIME >= '2018-08-08'

所以我们目前的计划是为每个表维护一个视图,并在每次暂存数据成功完成时通过 Python SDK 更新视图 SQL 中的硬编码日期(https://cloud.google.com/bigquery/docs/managing - 意见)。

感觉就像我们可能会错过一个更简单/更有效的解决方案来解决这个问题。所以我想问:

  • 在 Dataprep 或 BigQuery 中按日期执行此初始过滤是否更具成本效益?
  • 过滤所选产品中数据的最具成本效益的方法是什么?
4

1 回答 1

3

你熟悉标准 SQL 的MERGE语句和发布的集群特性吗?这实际上可以合并您的数据,您可以进一步自定义它以仅读取某些分区。

手册中的示例:

MERGE dataset.DetailedInventory T
USING dataset.Inventory S
ON T.product = S.product
WHEN NOT MATCHED AND quantity < 20 THEN
  INSERT(product, quantity, supply_constrained, comments)
  VALUES(product, quantity, true, ARRAY<STRUCT<created DATE, comment STRING>>[(DATE('2016-01-01'), 'comment1')])
WHEN NOT MATCHED THEN
  INSERT(product, quantity, supply_constrained)
  VALUES(product, quantity, false)

提示:您可以按 分区null,并且仅利用“集群级别”

于 2018-08-10T12:38:03.510 回答