1

我有一个长时间运行的管道,其中包含一些失败的项目(在流程结束时未加载的项目,因为它们未能通过数据库验证或类似的东西)。

我想重新运行管道,但只处理上次运行时导入失败的项目。

我有一个系统可以检查每个项目 ID(我从外部来源收到的)。我在我的装载机中执行此检查。如果我已经在数据库中拥有该项目 ID,我将跳过在数据库中加载/插入该项目。

这很好用。但是,它很慢,因为我对这些项目中的每一个都进行了提取转换加载,然后才在加载时查询数据库(每个项目一个查询)并比较项目 ID。

我想尽快过滤掉这些记录。如果我在变压器中做,我只能按项目再做一次。看起来提取器可能是这个地方,或者我可以将记录分批传递给转换器,然后过滤+分解(第一个)转换器中的项目。

这里有什么更好的方法?

我也在考虑我的提取器的可重用性,但我想我可以接受一个提取器既提取又过滤的事实。我认为最好的解决方案是能够链接多个提取器。然后我会有一个提取数据和另一个过滤数据。

编辑:也许我可以做这样的事情:

already_imported_item_ids = Items.pluck(:item_id)

Kiba.run(
  Kiba.parse do
    source(...)

    transform do |item|
      next if already_imported_item_ids.include?(item)

      item
    end

    transform(...)
    destination(...)
  end
)

我想这可以工作吗?

4

1 回答 1

1

一些提示:

  1. 管道中的越高(越早)越好。如果你能找到一种从源头上直接过滤掉的方法,成本会更低,因为你根本不需要操纵数据。

  2. 如果您的规模足够小,您可以在一个块的开头仅加载完整的 id 列表pre_process(主要是您在代码示例中想到的),然后在源代码之后进行比较。显然它不会无限扩展,但它可以工作很长时间,具体取决于您的数据集大小。

  3. 如果您需要更大的规模,我建议您使用缓冲转换(将 N 行分组)来实现单个 SQL 查询以验证目标数据库中是否存在所有 N 行 ID,或者使用组行然后确实爆炸。

于 2020-07-03T12:36:50.703 回答