我有一个长时间运行的管道,其中包含一些失败的项目(在流程结束时未加载的项目,因为它们未能通过数据库验证或类似的东西)。
我想重新运行管道,但只处理上次运行时导入失败的项目。
我有一个系统可以检查每个项目 ID(我从外部来源收到的)。我在我的装载机中执行此检查。如果我已经在数据库中拥有该项目 ID,我将跳过在数据库中加载/插入该项目。
这很好用。但是,它很慢,因为我对这些项目中的每一个都进行了提取转换加载,然后才在加载时查询数据库(每个项目一个查询)并比较项目 ID。
我想尽快过滤掉这些记录。如果我在变压器中做,我只能按项目再做一次。看起来提取器可能是这个地方,或者我可以将记录分批传递给转换器,然后过滤+分解(第一个)转换器中的项目。
这里有什么更好的方法?
我也在考虑我的提取器的可重用性,但我想我可以接受一个提取器既提取又过滤的事实。我认为最好的解决方案是能够链接多个提取器。然后我会有一个提取数据和另一个过滤数据。
编辑:也许我可以做这样的事情:
already_imported_item_ids = Items.pluck(:item_id)
Kiba.run(
Kiba.parse do
source(...)
transform do |item|
next if already_imported_item_ids.include?(item)
item
end
transform(...)
destination(...)
end
)
我想这可以工作吗?