有趣的应用!重新出现错误,我运行您的代码,消息是它无法腌制 VideoCapture 对象,请参阅下面的链接,这可能是接收管道为空的原因。两个线程有两个错误:第一个是泡菜,然后是 EOF。
编辑#2:我设法用每个视频一个进程运行它等:
关于性能,我首先在不合并图像的情况下进行操作(我必须修复一些细节),以查看它是否接收到了 3 帧和 4 帧,显示在与接收线程分开的窗口中,它播放得非常快,比实时更快(用 3-4 个流测试)。我认为显示的合并和调整大小很慢,图片为 4 个流 (4x1280x720) 获得 2560x1440。就我而言,它已调整大小以适合屏幕。
感谢分享那个问题和那个图书馆等!
(顺便说一句,我最初也尝试使用锁,但碰巧没有必要。代码需要清理一些实验。此外,当前实现可能不会为每个流的每帧同步,因为它没有加入每帧作为您的原始示例,它创建了新的进程来从每个帧中获取一个帧然后合并它们。)
CPU 负载主要在主进程中(一个 4 核 CPU,因此每个实例 max=25%):
有时:
0.06684677699999853 0.030737616999999773 1.2829999995744856e-06 镜头(框架)= 9 0.06703700200000284 0.030708104000002123 6.409990EN7997
可以调整主循环中的 waitKey。
代码
https://github.com/Twenkid/Twenkid-FX-Studio/blob/master/Py/YoutubeAggregatorPafy/y6.py
# Merging Youtube streams with pafy, opencv and multithreading
# Base code by Fraser Langton - Thanks!
# Refactored and debugged by Twenkid
# version y6 - more cleaning of unused code, properly close VideoCapture in the processes
import multiprocessing #Process, Lock
from multiprocessing import Lock # Not needed
import cv2
import numpy as np
import pafy
import typing
import timeit
import time
urls = [
"https://www.youtube.com/watch?v=tT0ob3cHPmE",
"https://www.youtube.com/watch?v=XmjKODQYYfg",
"https://www.youtube.com/watch?v=E2zrqzvtWio",
"https://www.youtube.com/watch?v=6cQLNXELdtw",
"https://www.youtube.com/watch?v=s_rmsH0wQ3g",
"https://www.youtube.com/watch?v=QfhpNe6pOqU",
"https://www.youtube.com/watch?v=C_9x0P0ebNc",
"https://www.youtube.com/watch?v=Ger6gU_9v9A",
"https://www.youtube.com/watch?v=39dZ5WhDlLE"
]
# Merging seems to require equal number of sides, so 2x2, 3x3 etc. The resolutions should be the same.
'''
[
"https://www.youtube.com/watch?v=C_9x0P0ebNc",
"https://www.youtube.com/watch?v=Ger6gU_9v9A",
"https://www.youtube.com/watch?v=39dZ5WhDlLE",
"https://www.youtube.com/watch?v=QfhpNe6pOqU",
]
'''
width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920, 1080
streams = []
#bestStreams = []
def main():
global bestStreams
streams = [pafy.new(url).getbest() for url in urls]
print(streams)
#[bestStreams for best in streams]
#print(bestStreams)
cv2.waitKey(0)
videos = [cv2.VideoCapture() for streams in streams]
bestURLS = []
#[video.open(best.url) for video, best in zip(videos, streams)] # Opened per process
[bestURLS.append(best.url) for best in streams]
#[ for video, best in zip(videos, streams)]
print(bestURLS)
cv2.waitKey(0)
cv2.namedWindow('Video', cv2.WINDOW_FREERATIO)
cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
LOCK = Lock()
#proc = get_framesUL(bestStreams, LOCK)
#proc, pipes = get_framesULJ(bestStreams, LOCK)
proc, pipes = get_framesULJ(bestURLS, LOCK)
print("PROC, PIPES", proc, pipes)
#cv2.waitKey(0)
frames = []
numStreams = len(streams)
while True:
start_time = timeit.default_timer()
# frames = [cv2.resize(video.read()[-1], (dim[0] // width, dim[1] // width)) for video in videos]
#frames = get_frames(videos, LOCK)
#frames = get_framesUL(streams, LOCK)
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
frames = [x.recv() for x in pipes]
lf = len(frames)
print("LEN(FRAMES)=", lf);
#if lf<3: time.sleep(3); print("LEN(FRAMES)=", lf); #continue #Else merge and show
#proc.join()
#elif lf==3: frames = [x.recv() for x in pipes]
dst = merge_frames(frames)
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
#if cv2!=None:
try:
cv2.imshow('Video', dst)
except: print("Skip")
#cv2.waitKey(1)
if cv2.waitKey(20) & 0xFF == ord('e'):
break
print(timeit.default_timer() - start_time)
continue
for proc in jobs:
proc.join()
# [video.release() for video in videos] # Per process
cv2.destroyAllWindows()
def get_framesULJ(videosURL, L): #return the processes, join in main and read the frames there
# frames = [video.read()[-1] for video in videos]
print("get_framesULJ:",videosURL)
jobs = []
pipe_list = []
#print("VIDEOS:",videosURL)
#for video in videos:
for videoURL in videosURL: #urls:
recv_end, send_end = multiprocessing.Pipe(False)
print(recv_end, send_end)
p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
#p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
#if (p==None): continue
print("P = ", p)
#time.sleep(0.001)
jobs.append(p)
print("JOBS, len", jobs, len(jobs))
pipe_list.append(recv_end)
print("pipe_list", pipe_list)
p.start()
#cv2.waitKey(0)
#for proc in jobs:
# proc.join()
#frames = [x.recv() for x in pipe_list]
#return frames
#cv2.waitKey(0)
return jobs, pipe_list
def get_frame2L(videoURL, send_end, L):
v = cv2.VideoCapture()
#[video.open(best.url)
#L.acquire()
v.open(videoURL)
print("get_frame2", videoURL, v, send_end)
#cv2.waitKey(0)
while True:
ret, frame = v.read()
if ret: send_end.send(frame); #cv2.imshow("FRAME", frame); cv2.waitKey(1)
else: print("NOT READ!"); break
#send_end.send(v.read()[1])
#L.release()
def get_framesUL(videosURL, L):
# frames = [video.read()[-1] for video in videos]
jobs = []
pipe_list = []
print("VIDEOS:",videosURL)
#for video in videos:
for videoURL in videosURL: #urls:
recv_end, send_end = multiprocessing.Pipe(False)
print(recv_end, send_end)
p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
#p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
#if (p==None): continue
print("P = ", p)
#time.sleep(0.001)
jobs.append(p)
print("JOBS, len", jobs, len(jobs))
pipe_list.append(recv_end)
print("pipe_list", pipe_list)
p.start()
for proc in jobs:
proc.join()
frames = [x.recv() for x in pipe_list]
return frames
def get_frames(videos, L):
# frames = [video.read()[-1] for video in videos]
jobs = []
pipe_list = []
print("VIDEOS:",videos)
for video in videos:
recv_end, send_end = multiprocessing.Pipe(False)
print(recv_end, send_end)
p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
#p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
#if (p==None): continue
print("P = ", p)
#time.sleep(0.001)
jobs.append(p)
print("JOBS, len", jobs, len(jobs))
pipe_list.append(recv_end)
print("pipe_list", pipe_list)
p.start()
for proc in jobs:
proc.join()
frames = [x.recv() for x in pipe_list]
return frames
def get_frame(video, send_end, L):
L.acquire()
print("get_frame", video, send_end)
send_end.send(video.read()[1])
L.release()
# send_end.send(cv2.resize(video.read()[1], (dim[0] // width, dim[1] // width)))
def get_frame2(videoURL, send_end):
v = video.open(videoURL)
while True:
ret, frame = v.read()
if ret: send_end.send(frame)
else: break
def merge_frames(frames: typing.List[np.ndarray]):
#cv2.imshow("FRAME0", frames[0]) ########## not images/small
#cv2.imshow("FRAME1", frames[1]) ##########
#cv2.imshow("FRAME2", frames[2]) ##########
#cv2.imshow("FRAME3", frames[3]) ##########
#cv2.waitKey(1)
width = np.math.ceil(np.sqrt(len(frames)))
rows = []
for row in range(width):
i1, i2 = width * row, width * row + width
rows.append(np.hstack(frames[i1: i2]))
return np.vstack(rows)
if __name__ == '__main__':
main()
编辑#1 想法:为每个视频流创建一个进程并循环读取它(在管道中泵送),而不是为每一帧创建一个新进程,并且/因此通过管道打开带有 videoURL 的视频/VideoCapture 对象,而不是发送 VideoCapture 对象。(不知道这个表格有没有同样的泡菜问题)
...
in main:
bestURLS = []
proc, pipes = get_framesULJ(bestURLS, LOCK)
[bestURLS.append(best.url) for best in streams]
def get_frame2(videoURL, send_end):
v = video.open(videoURL)
while True:
ret, frame = v.read()
if ret: send_end.send(video)
else: break
def get_framesULJ(videosURL, L): #return the processes, join in main and read the frames there
print("get_framesULJ:",videosURL)
jobs = []
pipe_list = []
for videoURL in videosURL:
recv_end, send_end = multiprocessing.Pipe(False)
print(recv_end, send_end)
p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
print("P = ", p)
jobs.append(p)
print("JOBS, len", jobs, len(jobs))
pipe_list.append(recv_end)
print("pipe_list", pipe_list)
p.start()
return jobs, pipe_list
原答案:
<multiprocessing.connection.PipeConnection object at 0x000000000D3C7D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3BD2E0>
Traceback (most recent call last):
File "y.py", line 104, in <module>
main()
File "y.py", line 48, in main
frames = get_frames(videos)
File "y.py", line 80, in get_frames
p.start()
File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in
start
self._popen = self._Popen(self)
File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in
_Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 326, in
_Popen
return Popen(process_obj)
File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", lin
e 93, in __init__
reduction.dump(process_obj, to_child)
File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in
dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'cv2.VideoCapture' object
Z:\>Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 116, in sp
awn_main
exitcode = _main(fd, parent_sentinel)
File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 126, in _m
ain
self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
它在 p.start 之前失败。实例已创建,结构似乎还可以:
VIDEOS: [<VideoCapture 000000000D418710>, <VideoCapture 000000000D4186F0>, <Vide
oCapture 000000000D418B70>]
<multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3B62E0>
P = <Process name='Process-1' parent=8892 initial>
JOBS, len [<Process name='Process-1' parent=8892 initial>] 1
RECV_END <multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90
>
请参阅模块泡菜:
https://docs.python.org/3/library/pickle.html
似乎并非所有东西都可以“腌制”。
什么可以腌制和不腌制?
可以腌制以下类型:
None, True, and False
integers, floating point numbers, complex numbers
strings, bytes, bytearrays
tuples, lists, sets, and dictionaries containing only picklable objects
functions defined at the top level of a module (using def, not lambda)
built-in functions defined at the top level of a module
classes that are defined at the top level of a module
instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section Pickling Class Instances for details).
此外,opencv 中似乎有一个错误导致了这种情况。给出的解决方案之一是关闭多处理...
Python多进程无法腌制opencv videocapture对象
https://github.com/MVIG-SJTU/AlphaPose/issues/164
方浩树于 2018 年 10 月 17 日发表评论
这个错误是由于 opencv 中的多处理造成的。--sp 禁用多处理。顺便说一句,你能告诉我你正在使用的opencv版本吗?
我猜想关于锁定记忆之类的东西。
我会尝试的一种解决方法是首先将对象的像素转储为纯数据或原始数据,可能带有关于大小等的标题。
此外,一般来说,为了更流畅的播放,我认为需要添加一些缓冲。
顺便说一句,你的openCV是什么版本?我的是 4.2.0