0

我有代表视频帧的 dask 数组,并且想要创建多个视频文件。我正在使用imageio允许我将帧“附加”到 ffmpeg 子进程的库。所以我可能有这样的事情:

my_frames = [[arr1f1, arr1f2, arr1f3], [arr2f1, arr2f2, arr2f3], ...]

所以每个内部列表代表一个视频(或产品)的帧。我正在寻找发送/提交要计算的帧的最佳方法,同时还在imageio它们完成时(按顺序)写入帧。更复杂的是,上面的内部列表实际上是生成器,可以是 100 或 1000 帧。还要记住,由于imageio工作方式的原因,我认为它需要存在于一个进程中。这是我迄今为止工作的简化版本:

for frame_arrays in frames_to_write:
    # 'frame_arrays' is [arr1f1, arr2f1, arr3f1, ...]
    future_list = _client.compute(frame_arrays)
    # key -> future
    future_dict = dict(zip(frame_keys, future_list))

    # write the current frame
    # future -> key
    rev_future_dict = {v: k for k, v in future_dict.items()}
    result_iter = as_completed(future_dict.values(), with_results=True)
    for future, result in result_iter:
        frame_key = rev_future_dict[future]
        # get the writer for this specific video and add a new frame
        w = writers[frame_key]
        w.append_data(result)

这可行,我的实际代码从上面重新组织以在编写当前帧时提交下一帧,所以我认为有一些好处。我正在考虑一个解决方案,用户说“我想一次处理 X 帧”,所以我发送 50 帧,写入 50 帧,再发送 50 帧,写入 50 帧等。

在研究了一段时间后我的问题:

  1. 的数据何时存在result于本地内存中?什么时候被迭代器返回或者什么时候完成?
  2. 是否可以使用 dask-core 线程调度程序执行此类操作,以便用户不必安装分布式?
  3. 是否可以根据工作人员的数量来调整发送的帧数?
  4. 有没有办法发送 dask 数组的字典和/或使用 as_completed 并包含“frame_key”?
  5. 如果我加载整个系列的帧并将它们提交给客户端/集群,我可能会杀死调度程序,对吗?
  6. 是使用get_client()后跟获取客户端Client()ValueError首选方式(如果用户未提供)?
  7. 是否有可能在工人可用时提供 dask/distributed 一个或多个迭代器?
  8. 我是不是很笨?过于复杂了?

注意:这是我不久前对这个问题的一种扩展,但略有不同。

4

1 回答 1

0

在遵循了这里的很多例子之后,我得到了以下信息:

    try:
        # python 3
        from queue import Queue
    except ImportError:
        # python 2
        from Queue import Queue
    from threading import Thread

    def load_data(frame_gen, q):
        for frame_arrays in frame_gen:
            future_list = client.compute(frame_arrays)
            for frame_key, arr_future in zip(frame_keys, future_list):
                q.put({frame_key: arr_future})
        q.put(None)

    input_q = Queue(batch_size if batch_size is not None else 1)
    load_thread = Thread(target=load_data, args=(frames_to_write, input_q,))
    remote_q = client.gather(input_q)
    load_thread.start()

    while True:
        future_dict = remote_q.get()
        if future_dict is None:
            break

        # write the current frame
        # this should only be one element in the dictionary, but this is
        # also the easiest way to get access to the data
        for frame_key, result in future_dict.items():
            # frame_key = rev_future_dict[future]
            w = writers[frame_key]
            w.append_data(result)
        input_q.task_done()

    load_thread.join()

这回答了我的大部分问题,并且似乎按照我想要的方式工作。

于 2019-02-09T14:27:39.663 回答