17

我正在读取 OSX 上的网络摄像头,它适用于这个简单的脚本:

import cv2
camera = cv2.VideoCapture(0)

while True:
    try:
        (grabbed, frame) = camera.read()  # grab the current frame
        frame = cv2.resize(frame, (640, 480))  # resize the frame
        cv2.imshow("Frame", frame)  # show the frame to our screen
        cv2.waitKey(1)  # Display it at least one ms before going to the next frame
    except KeyboardInterrupt:
        # cleanup the camera and close any open windows
        camera.release()
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break

我现在想在一个单独的进程中阅读视频,我有一个更长的脚本,并且在 Linux 上的一个单独的进程中正确地读出了视频:

import numpy as np
import time
import ctypes
import argparse

from multiprocessing import Array, Value, Process
import cv2


class VideoCapture:
    """
    Class that handles video capture from device or video file
    """
    def __init__(self, device=0, delay=0.):
        """
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(floating point is allowed)
        """
        self._cap = cv2.VideoCapture(device)
        self._delay = delay

    def _proper_frame(self, delay=None):
        """
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int(see multiprocessing.Value)
        :return: frame
        """
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there's no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture the frame.")
            # Delay before we get a new frame
            time.sleep(delay)
        return snapshot

    def get_size(self):
        """
        :return: size of the captured image
        """
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def get_stream_function(self):
        """
        Returns stream_function object function
        """

        def stream_function(image, finished):
            """
            Function keeps capturing frames until finished = 1
            :param image: shared numpy array for multiprocessing(see multiprocessing.Array)
            :param finished: synchronized wrapper for int(see multiprocessing.Value)
            :return: nothing
            """
            # Incorrect input array
            if image.shape != self.get_size():
                raise Exception("Capture: improper size of the input image")
            print("Capture: start streaming")
            # Capture frame until we get finished flag set to True
            while not finished.value:
                image[:, :, :] = self._proper_frame(self._delay)
            # Release the device
            self.release()

        return stream_function

    def release(self):
        self._cap.release()


def main():
    # Add program arguments
    parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-output', dest="output",  default="output.avi", help='name of the output video file')
    parser.add_argument('-log', dest="log",  default="frames.log", help='name of the log file')
    parser.add_argument('-fps', dest="fps",  default=25., help='frames per second value')

    # Read the arguments if any
    result = parser.parse_args()
    fps = float(result.fps)
    output = result.output
    log = result.log

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()
    stream = cap.get_stream_function()

    # Define shared variables(which are synchronised so race condition is excluded)
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream, args=(frame, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first
    time.sleep(2)

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        time.sleep(1)
        # Terminate working processes
        video_process.terminate()

    # The capturing works until keyboard interrupt is pressed.
    while True:
        try:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            cv2.waitKey(1)  # Display it at least one ms before going to the next frame
            time.sleep(0.1)

        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            terminate()
            break

if __name__ == '__main__':
    main()

这在 Linux 上运行良好,但在 OSX 上我遇到了麻烦,因为它似乎无法.read()对创建的cv2.VideoCapture(device)对象(存储在 var 中self._cap)执行操作。

经过一番搜索,我找到了这个 SO answer,它建议使用Billiard,它是 python 多处理的替代品,据说有一些非常有用的改进。因此,在文件的顶部,我只是在之前的多处理导入之后添加了导入(有效地覆盖multiprocessing.Process):

from billiard import Process, forking_enable

video_process就在变量的实例化之前,我使用forking_enable如下:

forking_enable(0)  # Supposedly this is all I need for billiard to do it's magic
video_process = Process(target=stream, args=(frame, finished))

所以在这个版本中(在 pastebin 上)然后我再次运行文件,这给了我这个错误:

pickle.PicklingError: Can't pickle : it's not found as main .stream_function

对该错误的搜索使我遇到了一个 SO 问题,其中包含一长串答案,其中一个给了我使用dill 序列化库来解决这个问题的建议。但是,该库应该与Pathos 多处理 fork一起使用。所以我只是尝试将我的多处理导入线从

from multiprocessing import Array, Value, Process

from pathos.multiprocessing import Array, Value, Process

但没有Array,ValueProcess似乎存在于pathos.multiprocessing包中。

从这一点开始,我完全迷失了。我正在寻找我几乎没有足够知识的东西,我什至不知道我需要在哪个方向搜索或调试了。

那么有比我更聪明的灵魂来帮助我在单独的过程中捕捉视频吗?欢迎所有提示!

4

2 回答 2

6

您的第一个问题是您无法在forked进程中访问网络摄像头。当外部库与 fork 操作一起使用时会出现几个问题,fork因为不会清除父进程打开的所有文件描述符,从而导致奇怪的行为。该库通常对 linux 上的此类问题更加健壮,但cv2.VideoCapture在 2 个进程之间共享 IO 对象并不是一个好主意。

当您使用billard.forking_enabled并将其设置为 时False,您要求库不要用于fork生成新进程,而是spawnforkserver方法,这些方法更干净,因为它们关闭了所有文件描述符,但启动速度也较慢,这在您的情况下应该不是问题。如果您正在使用python3.4+,您可以使用multiprocessing.set_start_method('forkserver')例如来执行此操作。

当您使用其中一种方法时,需要对目标函数和参数进行序列化以传递给子进程。默认情况下,序列化是使用 完成的pickle,它有几个流程,正如你提到的那样,无法序列化本地定义的对象和cv2.VideoCapture. 但是你可以简化你的程序,使你的所有参数都可以Process腌制。这是解决您的问题的暂定方法:

import numpy as np
import time
import ctypes

from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2


class VideoCapture:
    """
    Class that handles video capture from device or video file
    """
    def __init__(self, device=0, delay=0.):
        """
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(float allowed)
        """
        self._delay = delay
        self._device = device
        self._cap = cv2.VideoCapture(device)
        assert self._cap.isOpened()

    def __getstate__(self):
        self._cap.release()
        return (self._delay, self._device)

    def __setstate__(self, state):
        self._delay, self._device = state
        self._cap = cv2.VideoCapture(self._device)
        assert self._cap.grab(), "The child could not grab the video capture"

    def _proper_frame(self, delay=None):
        """
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int
        :return: frame
        """
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there's no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture "
                                "the frame.")
            # Delay before we get a new frame
            time.sleep(delay)
        return snapshot

    def get_size(self):
        """
        :return: size of the captured image
        """
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def release(self):
        self._cap.release()


def stream(capturer, image, finished):
    """
    Function keeps capturing frames until finished = 1
    :param image: shared numpy array for multiprocessing
    :param finished: synchronized wrapper for int
    :return: nothing
    """
    shape = capturer.get_size()

    # Define shared variables
    frame = np.ctypeslib.as_array(image.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])

    # Incorrect input array
    if frame.shape != capturer.get_size():
        raise Exception("Capture: improper size of the input image")
    print("Capture: start streaming")
    # Capture frame until we get finished flag set to True
    while not finished.value:
        frame[:, :, :] = capturer._proper_frame(capturer._delay)

    # Release the device
    capturer.release()


def main():

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()

    # Define shared variables
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream,
                            args=(cap, shared_array_base, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first
    time.sleep(2)

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        time.sleep(1)
        # Terminate working processes
        video_process.join()

    # The capturing works until keyboard interrupt is pressed.
    while True:
        try:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            # Display it at least one ms before going to the next frame
            time.sleep(0.1)
            cv2.waitKey(1)

        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            terminate()
            break


if __name__ == '__main__':
    set_start_method("spawn")
    main()

我目前无法在 mac 上对其进行测试,因此它可能无法开箱即用,但不应出现multiprocessing相关错误。一些注意事项:

  • cv2.VideoCapture在新子对象中实例化对象并抓住相机,因为只有一个进程应该从相机中读取。
  • 也许您的第一个程序中的问题fork只是由于cv2.VideoCapture在函数中共享和重新创建它stream可以解决您的问题。
  • 您不能将 numpy 包装器传递给孩子,因为它不会共享mp.Array缓冲区(这真的很奇怪,我花了一段时间才弄清楚)。您需要显式传递Array并重新创建一个包装器。
  • 也许您的第一个程序中的问题fork只是由于cv2.VideoCapture在函数中共享和重新创建它stream可以解决您的问题。

  • 我假设您正在运行您的代码,python3.4+所以我没有使用billard,但使用forking_enabled(False)而不是set_start_method应该是类似的。

让我知道这是否有效!

于 2017-04-30T09:21:21.660 回答
5

主要挑战multiprocessing是在分离的内存地址空间的情况下理解内存模型。

Python 使事情变得更加混乱,因为它抽象了许多这些方面,在几个看似无辜的 API 下隐藏了几种机制。

当您编写此逻辑时:

# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()

...

# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start()  # Launch capture process

您传递给Process stream_functionwhich 是指VideoCapture类的内部组件 ( self.get_size),但更重要的是,它不能作为顶级函数使用

子进程将无法重新构造所需的对象,因为它收到的只是一个函数的名称。它尝试在主模块中查找它,因此显示消息:

pickle.PicklingError: Can't pickle : it's not found as main.stream_function

子进程正在尝试解析主模块中的函数,main.stream_function并且查找失败。

我的第一个建议是更改您的逻辑,以便将方法返回传递给子进程stream_function

video_process = Process(target=cap.get_stream_function, args=(...))

然而,当您在两个进程之间共享状态时,您可能仍然会遇到问题。

当人们在 Python 中处理多处理范例时,我通常建议他们将进程视为运行在分离的机器中。在这些情况下,很明显您的架构存在问题。

我建议您将两个进程的职责分开,确保一个进程(子进程)处理视频的整个捕获,而另一个进程(父进程或第三个进程)处理帧的处理。

这种范式被称为生产者和消费者问题,它非常适合您的系统。视频捕获过程将是生产者,另一个是消费者。一个简单的multiprocessing.Pipeormultiprocessing.Queue会确保帧一旦准备好就从生产者转移到消费者。

在伪代码中添加一个示例,因为我不知道视频捕获 API。关键是在生产者过程中处理整个视频捕获逻辑,将其从消费者中抽象出来。消费者只需要知道它通过管道接收帧对象。

def capture_video(writer):
    """This runs in the producer process."""
    # The VideoCapture class wraps the video acquisition logic
    cap = VideoCapture()

    while True:
        frame = cap.get_next_frame()  # the method returns the next frame
        writer.send(frame)  # send the new frame to the consumer process

def main():
    reader, writer = multiprocessing.Pipe(False)

    # producer process
    video_process = Process(target=capture_video, args=[writer])
    video_process.start()  # Launch capture process

    while True:
        try:
            frame = reader.recv()  # receive next frame from the producer
            process_frame(frame)
        except KeyboardInterrupt:
            video_process.terminate()
            break

请注意进程之间没有共享状态(无需共享任何数组)。通信通过管道并且是单向的,使得逻辑非常简单。正如我上面所说,这种逻辑也适用于不同的机器。您只需要用插座替换管道。

您可能希望为生产者进程提供更清晰的终止方法。我建议你使用multiprocessing.Event. 只需从块中的父级设置它KeyboardInterrupt并在每次迭代时检查其在子级中的状态(while not event.is_set())。

于 2017-04-26T08:58:50.810 回答