0

我正在使用prefect并定义一个流程来使用 cosmos db 插入文档。

问题是query_items()调用是可迭代的,对于大型容器,没有办法将所有条目保存在内存中。

我相信我的问题可以简化为:

  • given an iterator, how can I create batches to be processed (mapped) in a prefect flow?

例子:

def big_iterable_function_i_cannot_change():
    yield from range(1000000) # some large amount of work

@task
def some_prefect_batching_magic(x):
    # magic code here
    pass


with Flow("needs-to-be-batched"):
    some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())

上面的代码或类似的代码会给我一个错误:

prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
4

1 回答 1

0

您收到此错误是因为您没有定义big_iterable_function_i_cannot_changetask. prefect实际上并没有flow直接执行 a。flow用于制作schedule, (用说法dask)——然后用于执行流程(据我所知)。仅当与dask executorprefect一起使用时才会发生并行化。

这是我对你的看法flow。但是,如果您无法将任务装饰器添加big_iterable_function_i_cannot_change到 atask中,请将其包装在任务中。最后 - 不确定您是否可以将生成器传递给映射任务。

import prefect
from prefect import Flow, Parameter, task

@task
def big_iterable_function_i_cannot_change():
    return range(5) # some large amount of work

@task
def some_prefect_batching_magic(x):
    # magic code here
    pass


with Flow("needs-to-be-batched") as flow:
    itter_res = big_iterable_function_i_cannot_change()
    post_process_res = some_prefect_batching_magic.map(itter_res)

flow.visualize()
state = flow.run()


flow.visualize(flow_state=state)
于 2021-03-17T15:23:49.600 回答