我正在使用prefect并定义一个流程来使用 cosmos db 插入文档。
问题是query_items()调用是可迭代的,对于大型容器,没有办法将所有条目保存在内存中。
我相信我的问题可以简化为:
given an iterator, how can I create batches to be processed (mapped) in a prefect flow?
例子:
def big_iterable_function_i_cannot_change():
yield from range(1000000) # some large amount of work
@task
def some_prefect_batching_magic(x):
# magic code here
pass
with Flow("needs-to-be-batched"):
some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())
上面的代码或类似的代码会给我一个错误:
prefect.FlowRunner | Flow run FAILED: some reference tasks failed.