1

我正在使用pafy流式传输一组 youtube 视频,目的是将它们组合(分屏样式)并显示为一个视频。它可以工作,但是当超过两个视频时帧速率非常慢,因为从每个流中获取一个帧,当我尝试 9 个视频(对于 3x3 针迹)时,帧的获取需要 0.1725 秒(太慢)。

我认为减少这种情况的最佳方法是以并行/多进程方式获取流。

我尝试使用管道和多处理,但我得到一个 EOFError: Ran out of input

请参阅下面的代码注释掉/frames = 在行中以在工作但缓慢的方法和我的多重处理尝试之间进行更改

import multiprocessing
import cv2
import numpy as np
import pafy
import typing
import timeit

urls = [
    "https://www.youtube.com/watch?v=tT0ob3cHPmE",
    "https://www.youtube.com/watch?v=XmjKODQYYfg",
    "https://www.youtube.com/watch?v=E2zrqzvtWio",

    "https://www.youtube.com/watch?v=6cQLNXELdtw",
    "https://www.youtube.com/watch?v=s_rmsH0wQ3g",
    "https://www.youtube.com/watch?v=QfhpNe6pOqU",

    "https://www.youtube.com/watch?v=C_9x0P0ebNc",
    "https://www.youtube.com/watch?v=Ger6gU_9v9A",
    "https://www.youtube.com/watch?v=39dZ5WhDlLE"
]
width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920, 1080


def main():
    streams = [pafy.new(url).getbest() for url in urls]

    videos = [cv2.VideoCapture() for streams in streams]

    [video.open(best.url) for video, best in zip(videos, streams)]

    cv2.namedWindow('Video', cv2.WINDOW_FREERATIO)
    cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)

    while True:
        start_time = timeit.default_timer()
        # frames = [cv2.resize(video.read()[-1], (dim[0] // width, dim[1] // width)) for video in videos]
        frames = get_frames(videos)
        print(timeit.default_timer() - start_time)

        start_time = timeit.default_timer()
        dst = merge_frames(frames)
        print(timeit.default_timer() - start_time)

        start_time = timeit.default_timer()
        cv2.imshow('Video', dst)

        if cv2.waitKey(1) & 0xFF == ord('e'):
            break
        print(timeit.default_timer() - start_time)

        continue

    [video.release() for video in videos]
    cv2.destroyAllWindows()


def get_frames(videos):
    # frames = [video.read()[-1] for video in videos]

    jobs = []
    pipe_list = []
    for video in videos:
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=get_frame, args=(video, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()

    frames = [x.recv() for x in pipe_list]
    return frames


def get_frame(video, send_end):
    send_end.send(video.read()[1])
    # send_end.send(cv2.resize(video.read()[1], (dim[0] // width, dim[1] // width)))


def merge_frames(frames: typing.List[np.ndarray]):
    width = np.math.ceil(np.sqrt(len(frames)))
    rows = []
    for row in range(width):
        i1, i2 = width * row, width * row + width
        rows.append(np.hstack(frames[i1: i2]))
    return np.vstack(rows)


if __name__ == '__main__':
    main()
4

1 回答 1

4

有趣的应用!重新出现错误,我运行您的代码,消息是它无法腌制 VideoCapture 对象,请参阅下面的链接,这可能是接收管道为空的原因。两个线程有​​两个错误:第一个是泡菜,然后是 EOF。

编辑#2:我设法用每个视频一个进程运行它等:

在此处输入图像描述

在此处输入图像描述

关于性能,我首先在不合并图像的情况下进行操作(我必须修复一些细节),以查看它是否接收到了 3 帧和 4 帧,显示在与接收线程分开的窗口中,它播放得非常快,比实时更快(用 3-4 个流测试)。我认为显示的合并和调整大小很慢,图片为 4 个流 (4x1280x720) 获得 2560x1440。就我而言,它已调整大小以适合屏幕。

感谢分享那个问题和那个图书馆等!

(顺便说一句,我最初也尝试使用锁,但碰巧没有必要。代码需要清理一些实验。此外,当前实现可能不会为每个流的每帧同步,因为它没有加入每帧作为您的原始示例,它创建了新的进程来从每个帧中获取一个帧然后合并它们。)

CPU 负载主要在主进程中(一个 4 核 CPU,因此每个实例 max=25%):

在此处输入图像描述

有时:

0.06684677699999853 0.030737616999999773 1.2829999995744856e-06 镜头(框架)= 9 0.06703700200000284 0.030708104000002123 6.409990EN7997

可以调整主循环中的 waitKey。

代码

https://github.com/Twenkid/Twenkid-FX-Studio/blob/master/Py/YoutubeAggregatorPafy/y6.py

# Merging Youtube streams with pafy, opencv and multithreading
# Base code by Fraser Langton - Thanks!
# Refactored and debugged by Twenkid
# version y6 - more cleaning of unused code, properly close VideoCapture in the processes

import multiprocessing #Process, Lock
from multiprocessing import Lock # Not needed
import cv2
import numpy as np
import pafy
import typing
import timeit
import time

urls = [
    "https://www.youtube.com/watch?v=tT0ob3cHPmE",
    "https://www.youtube.com/watch?v=XmjKODQYYfg",
    "https://www.youtube.com/watch?v=E2zrqzvtWio",

    "https://www.youtube.com/watch?v=6cQLNXELdtw",
    "https://www.youtube.com/watch?v=s_rmsH0wQ3g",
    "https://www.youtube.com/watch?v=QfhpNe6pOqU",

    "https://www.youtube.com/watch?v=C_9x0P0ebNc",
    "https://www.youtube.com/watch?v=Ger6gU_9v9A", 
    "https://www.youtube.com/watch?v=39dZ5WhDlLE"
]

# Merging seems to require equal number of sides, so 2x2, 3x3 etc. The  resolutions should be the same.
'''
[    
    "https://www.youtube.com/watch?v=C_9x0P0ebNc",
    "https://www.youtube.com/watch?v=Ger6gU_9v9A",
    "https://www.youtube.com/watch?v=39dZ5WhDlLE",   
    "https://www.youtube.com/watch?v=QfhpNe6pOqU",
]
'''

width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920, 1080

streams = []
#bestStreams = []

def main():
    global bestStreams
    streams = [pafy.new(url).getbest() for url in urls]
    print(streams)
    #[bestStreams for best in streams]
    #print(bestStreams)
    cv2.waitKey(0)
    videos = [cv2.VideoCapture() for streams in streams]
    bestURLS = [] 
    #[video.open(best.url) for video, best in zip(videos, streams)]  # Opened per process
    [bestURLS.append(best.url) for best in streams]
    
    #[ for video, best in zip(videos, streams)]
    print(bestURLS)
    cv2.waitKey(0)
    cv2.namedWindow('Video', cv2.WINDOW_FREERATIO)
    cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
    LOCK = Lock()
    #proc = get_framesUL(bestStreams, LOCK)
    #proc, pipes = get_framesULJ(bestStreams, LOCK)
    proc, pipes = get_framesULJ(bestURLS, LOCK)     
    print("PROC, PIPES", proc, pipes)
    #cv2.waitKey(0)
    frames = []
    numStreams = len(streams)
    while True:
        start_time = timeit.default_timer()
        # frames = [cv2.resize(video.read()[-1], (dim[0] // width, dim[1] // width)) for video in videos]
        #frames = get_frames(videos, LOCK)
        #frames = get_framesUL(streams, LOCK)
        
        
        print(timeit.default_timer() - start_time)

        start_time = timeit.default_timer()
        
        frames = [x.recv() for x in pipes]
        lf = len(frames)
        print("LEN(FRAMES)=", lf);
        #if lf<3: time.sleep(3); print("LEN(FRAMES)=", lf); #continue #Else merge and show
        #proc.join()
        #elif lf==3: frames = [x.recv() for x in pipes]
                
        dst = merge_frames(frames)
        print(timeit.default_timer() - start_time)
         
        start_time = timeit.default_timer()      
        #if cv2!=None:
        try:
          cv2.imshow('Video', dst)
        except: print("Skip")
        #cv2.waitKey(1)  

        if cv2.waitKey(20) & 0xFF == ord('e'):
            break
        print(timeit.default_timer() - start_time)

        continue
        
    for proc in jobs:
        proc.join()
        
    # [video.release() for video in videos] # Per process
    cv2.destroyAllWindows()



def get_framesULJ(videosURL, L): #return the processes, join in main and read the frames there
    # frames = [video.read()[-1] for video in videos]
    print("get_framesULJ:",videosURL)    
    jobs = []
    pipe_list = []
    #print("VIDEOS:",videosURL)    
    #for video in videos:
    for videoURL in videosURL: #urls:
        recv_end, send_end = multiprocessing.Pipe(False)
        print(recv_end, send_end)
        p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
        #p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #if (p==None): continue
        print("P = ", p)
        #time.sleep(0.001)
        jobs.append(p)
        print("JOBS, len", jobs, len(jobs))                
        pipe_list.append(recv_end)
        print("pipe_list", pipe_list)               
        p.start()
        #cv2.waitKey(0)

    #for proc in jobs:
    #    proc.join()

    #frames = [x.recv() for x in pipe_list]
    #return frames
    #cv2.waitKey(0)
    return jobs, pipe_list

def get_frame2L(videoURL, send_end, L):
    v = cv2.VideoCapture()
    #[video.open(best.url)
    #L.acquire()
    v.open(videoURL)
    print("get_frame2", videoURL, v, send_end)
    #cv2.waitKey(0)
    while True:      
      ret, frame = v.read()
      if ret: send_end.send(frame); #cv2.imshow("FRAME", frame); cv2.waitKey(1)   
      else: print("NOT READ!"); break
    #send_end.send(v.read()[1])
    #L.release()
    
def get_framesUL(videosURL, L):
    # frames = [video.read()[-1] for video in videos]

    jobs = []
    pipe_list = []
    print("VIDEOS:",videosURL)    
    #for video in videos:
    for videoURL in videosURL: #urls:
        recv_end, send_end = multiprocessing.Pipe(False)
        print(recv_end, send_end)
        p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
        #p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #if (p==None): continue
        print("P = ", p)
        #time.sleep(0.001)
        jobs.append(p)
        print("JOBS, len", jobs, len(jobs))                
        pipe_list.append(recv_end)
        print("pipe_list", pipe_list)               
        p.start()

    for proc in jobs:
        proc.join()

    frames = [x.recv() for x in pipe_list]
    return frames


def get_frames(videos, L):
    # frames = [video.read()[-1] for video in videos]

    jobs = []
    pipe_list = []
    print("VIDEOS:",videos)    
    for video in videos:
        recv_end, send_end = multiprocessing.Pipe(False)
        print(recv_end, send_end)
        p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #if (p==None): continue
        print("P = ", p)
        #time.sleep(0.001)
        jobs.append(p)
        print("JOBS, len", jobs, len(jobs))                
        pipe_list.append(recv_end)
        print("pipe_list", pipe_list)               
        p.start()

    for proc in jobs:
        proc.join()

    frames = [x.recv() for x in pipe_list]
    return frames
    
def get_frame(video, send_end, L):
    L.acquire()
    print("get_frame", video, send_end)
    send_end.send(video.read()[1])
    L.release()
    # send_end.send(cv2.resize(video.read()[1], (dim[0] // width, dim[1] // width)))

    
def get_frame2(videoURL, send_end):
    v = video.open(videoURL)       
    while True:
      ret, frame = v.read()
      if ret: send_end.send(frame)
      else: break
      
    
def merge_frames(frames: typing.List[np.ndarray]):
    #cv2.imshow("FRAME0", frames[0]) ########## not images/small
    #cv2.imshow("FRAME1", frames[1]) ##########
    #cv2.imshow("FRAME2", frames[2]) ##########
    #cv2.imshow("FRAME3", frames[3]) ##########
    #cv2.waitKey(1)
    width = np.math.ceil(np.sqrt(len(frames)))
    rows = []
    for row in range(width):
        i1, i2 = width * row, width * row + width
        rows.append(np.hstack(frames[i1: i2]))
    
    
    return np.vstack(rows)


if __name__ == '__main__':
    main()

编辑#1 想法:为每个视频流创建一个进程并循环读取它(在管道中泵送),而不是为每一帧创建一个新进程,并且/因此通过管道打开带有 videoURL 的视频/VideoCapture 对象,而不是发送 VideoCapture 对象。(不知道这个表格有没有同样的泡菜问题)

...
in main:
bestURLS = []
proc, pipes = get_framesULJ(bestURLS, LOCK) 
[bestURLS.append(best.url) for best in streams]



def get_frame2(videoURL, send_end):
    v = video.open(videoURL)       
    while True:
      ret, frame = v.read()
      if ret: send_end.send(video)
      else: break

 def get_framesULJ(videosURL, L): #return the processes, join in main and read the frames there
print("get_framesULJ:",videosURL)    
jobs = []
pipe_list = []
for videoURL in videosURL:
    recv_end, send_end = multiprocessing.Pipe(False)
    print(recv_end, send_end)
    p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))       
    print("P = ", p)
    jobs.append(p)
    print("JOBS, len", jobs, len(jobs))                
    pipe_list.append(recv_end)
    print("pipe_list", pipe_list)               
    p.start()

return jobs, pipe_list

原答案:

<multiprocessing.connection.PipeConnection object at 0x000000000D3C7D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3BD2E0>
Traceback (most recent call last):
  File "y.py", line 104, in <module>
    main()
  File "y.py", line 48, in main
    frames = get_frames(videos)
  File "y.py", line 80, in get_frames
    p.start()
  File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in
start
    self._popen = self._Popen(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in
_Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 326, in
_Popen
    return Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", lin
e 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in
 dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'cv2.VideoCapture' object

Z:\>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 116, in sp
awn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 126, in _m
ain
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input

它在 p.start 之前失败。实例已创建,结构似乎还可以:

VIDEOS: [<VideoCapture 000000000D418710>, <VideoCapture 000000000D4186F0>, <Vide
oCapture 000000000D418B70>]
<multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3B62E0>
P =  <Process name='Process-1' parent=8892 initial>
JOBS, len [<Process name='Process-1' parent=8892 initial>] 1
RECV_END <multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90
>

请参阅模块泡菜:

https://docs.python.org/3/library/pickle.html

似乎并非所有东西都可以“腌制”。

什么可以腌制和不腌制?

可以腌制以下类型:

None, True, and False

integers, floating point numbers, complex numbers

strings, bytes, bytearrays

tuples, lists, sets, and dictionaries containing only picklable objects

functions defined at the top level of a module (using def, not lambda)

built-in functions defined at the top level of a module

classes that are defined at the top level of a module

instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section Pickling Class Instances for details).

此外,opencv 中似乎有一个错误导致了这种情况。给出的解决方案之一是关闭多处理...

Python多进程无法腌制opencv videocapture对象

https://github.com/MVIG-SJTU/AlphaPose/issues/164

方浩树于 2018 年 10 月 17 日发表评论

这个错误是由于 opencv 中的多处理造成的。--sp 禁用多处理。顺便说一句,你能告诉我你正在使用的opencv版本吗?

我猜想关于锁定记忆之类的东西。

我会尝试的一种解决方法是首先将对象的像素转储为纯数据或原始数据,可能带有关于大小等的标题。

此外,一般来说,为了更流畅的播放,我认为需要添加一些缓冲。

顺便说一句,你的openCV是什么版本?我的是 4.2.0

于 2020-12-23T02:16:05.070 回答