我有代表视频帧的 dask 数组,并且想要创建多个视频文件。我正在使用imageio
允许我将帧“附加”到 ffmpeg 子进程的库。所以我可能有这样的事情:
my_frames = [[arr1f1, arr1f2, arr1f3], [arr2f1, arr2f2, arr2f3], ...]
所以每个内部列表代表一个视频(或产品)的帧。我正在寻找发送/提交要计算的帧的最佳方法,同时还在imageio
它们完成时(按顺序)写入帧。更复杂的是,上面的内部列表实际上是生成器,可以是 100 或 1000 帧。还要记住,由于imageio
工作方式的原因,我认为它需要存在于一个进程中。这是我迄今为止工作的简化版本:
for frame_arrays in frames_to_write:
# 'frame_arrays' is [arr1f1, arr2f1, arr3f1, ...]
future_list = _client.compute(frame_arrays)
# key -> future
future_dict = dict(zip(frame_keys, future_list))
# write the current frame
# future -> key
rev_future_dict = {v: k for k, v in future_dict.items()}
result_iter = as_completed(future_dict.values(), with_results=True)
for future, result in result_iter:
frame_key = rev_future_dict[future]
# get the writer for this specific video and add a new frame
w = writers[frame_key]
w.append_data(result)
这可行,我的实际代码从上面重新组织以在编写当前帧时提交下一帧,所以我认为有一些好处。我正在考虑一个解决方案,用户说“我想一次处理 X 帧”,所以我发送 50 帧,写入 50 帧,再发送 50 帧,写入 50 帧等。
在研究了一段时间后我的问题:
- 的数据何时存在
result
于本地内存中?什么时候被迭代器返回或者什么时候完成? - 是否可以使用 dask-core 线程调度程序执行此类操作,以便用户不必安装分布式?
- 是否可以根据工作人员的数量来调整发送的帧数?
- 有没有办法发送 dask 数组的字典和/或使用 as_completed 并包含“frame_key”?
- 如果我加载整个系列的帧并将它们提交给客户端/集群,我可能会杀死调度程序,对吗?
- 是使用
get_client()
后跟获取客户端Client()
的ValueError
首选方式(如果用户未提供)? - 是否有可能在工人可用时提供 dask/distributed 一个或多个迭代器?
- 我是不是很笨?过于复杂了?
注意:这是我不久前对这个问题的一种扩展,但略有不同。