您可以将 URL 作为输入参数传递给 FFmpeg,而不是通过管道将数据传递给 FFmpeg。
从 WEB 站点 URL 获取流 URL:
def stream_to_url(url, quality='best'):
session = Streamlink()
streams = session.streams(url)
return streams[quality].to_url()
使用 FFprobe 获取视频分辨率(如果需要):
p = ffmpeg.probe(stream_url, select_streams='v');
width = p['streams'][0]['width']
height = p['streams'][0]['height']
以 URL 作为输入和原始(BGR)输出格式执行 FFmpeg 子流程:
process = (
ffmpeg
.input(stream_url)
.video
.output('pipe:', format='rawvideo', pix_fmt='bgr24')
.run_async(pipe_stdout=True) # In case ffmpeg in not in executable path, add cmd=fullpath like: .run_async(pipe_stdout=True, cmd=r'c:\FFmpeg\bin\ffmpeg.exe')
)
从 PIPE 读取帧,转换为 NumPy 数组,重塑和显示:
...
in_bytes = process.stdout.read(width * height * 3)
frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])
cv2.imshow('frame', frame)
...
完整的代码示例:
from streamlink import Streamlink
import numpy as np
import cv2
import ffmpeg
def stream_to_url(url, quality='best'):
""" Get URL, and return streamlink URL """
session = Streamlink()
streams = session.streams(url)
if streams:
return streams[quality].to_url()
else:
raise ValueError('Could not locate your stream.')
url = 'https://www.twitch.tv/riotgames' # Login to twitch TV before starting (the URL is for a random live stream).
quality='best'
stream_url = stream_to_url(url, quality)
# Use FFprobe to get video frames resolution (required in case resolution is unknown).
###############################################
p = ffmpeg.probe(stream_url, select_streams='v');
width = p['streams'][0]['width']
height = p['streams'][0]['height']
###############################################
# Execute FFmpeg sub-process with URL as input and raw (BGR) output format.
process = (
ffmpeg
.input(stream_url)
.video
.output('pipe:', format='rawvideo', pix_fmt='bgr24')
.run_async(pipe_stdout=True) # In case ffmpeg in not in executable path, add cmd=fullpath like: .run_async(pipe_stdout=True, cmd=r'c:\FFmpeg\bin\ffmpeg.exe')
)
# Read decoded video (frame by frame), and display each frame (using cv2.imshow)
while True:
# Read raw video frame from stdout as bytes array.
in_bytes = process.stdout.read(width * height * 3)
if not in_bytes:
break
# Transform the byte read into a NumPy array
frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])
# Display the frame
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
process.stdout.close()
process.wait()
cv2.destroyAllWindows()
有一个更简单的解决方案cv2.VideoCapture
:
stream_url = stream_to_url(url, quality)
cap = cv2.VideoCapture(stream_url)
while True:
success, frame = cap.read()
if not success:
break
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
更新:
从 Streamlink 子流程到 FFmpeg 子流程的管道:
假设您必须从 Streamlink 的 stdout 管道读取流并将其写入 FFmpeg 的 stdin 管道:
启动 Streamlink 子流程(使用-O
管道参数):
streamlink_args = [r'c:\Program Files (x86)\Streamlink\bin\streamlink.exe', stream_url, "best", "-O"] # Windows executable downloaded from: https://github.com/streamlink/streamlink/releases/tag/2.4.0
streamlink_process = sp.Popen(streamlink_args, stdout=sp.PIPE) # Execute Streamlink as sub-process
实现一个线程,从 Streamlink 的标准输出管道读取块并写入 FFmpeg 标准输入管道:
def writer(streamlink_proc, ffmpeg_proc):
while (not streamlink_proc.poll()) and (not ffmpeg_proc.poll()):
try:
chunk = streamlink_proc.stdout.read(1024)
ffmpeg_proc.stdin.write(chunk)
except (BrokenPipeError, OSError) as e:
pass
使用输入管道和输出管道执行 FFmpeg 子流程:
ffmpeg_process = (
ffmpeg
.input('pipe:')
.video
.output('pipe:', format='rawvideo', pix_fmt='bgr24')
.run_async(pipe_stdin=True, pipe_stdout=True) # In case ffmpeg in not in executable path, add cmd=fullpath like: .run_async(pipe_stdout=True, cmd=r'c:\FFmpeg\bin\ffmpeg.exe')
)
创建并启动线程:
thread = threading.Thread(target=writer, args=(streamlink_process, ffmpeg_process))
thread.start()
完整的代码示例:
import numpy as np
import subprocess as sp
import threading
import cv2
import ffmpeg
#stream_url = 'https://www.nimo.tv/v/v-1712291636586087045'
stream_url = 'https://www.twitch.tv/esl_csgo'
# Assume video resolution is known.
width, height = 1920, 1080
# Writer thread (read from streamlink and write to FFmpeg in chunks of 1024 bytes).
def writer(streamlink_proc, ffmpeg_proc):
while (not streamlink_proc.poll()) and (not ffmpeg_proc.poll()):
try:
chunk = streamlink_proc.stdout.read(1024)
ffmpeg_proc.stdin.write(chunk)
except (BrokenPipeError, OSError) as e:
pass
streamlink_args = [r'c:\Program Files (x86)\Streamlink\bin\streamlink.exe', stream_url, "best", "-O"] # Windows executable downloaded from: https://github.com/streamlink/streamlink/releases/tag/2.4.0
streamlink_process = sp.Popen(streamlink_args, stdout=sp.PIPE) # Execute streamlink as sub-process
# Execute FFmpeg sub-process with URL as input and raw (BGR) output format.
ffmpeg_process = (
ffmpeg
.input('pipe:')
.video
.output('pipe:', format='rawvideo', pix_fmt='bgr24')
.run_async(pipe_stdin=True, pipe_stdout=True) # In case ffmpeg in not in executable path, add cmd=fullpath like: .run_async(pipe_stdout=True, cmd=r'c:\FFmpeg\bin\ffmpeg.exe')
)
thread = threading.Thread(target=writer, args=(streamlink_process, ffmpeg_process))
thread.start()
# Read decoded video (frame by frame), and display each frame (using cv2.imshow)
while True:
# Read raw video frame from stdout as bytes array.
in_bytes = ffmpeg_process.stdout.read(width * height * 3)
if not in_bytes:
break
# Transform the byte read into a NumPy array
frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])
# Display the frame
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
ffmpeg_process.stdout.close()
ffmpeg_process.wait()
#streamlink_process.stdin.close()
streamlink_process.kill()
cv2.destroyAllWindows()
笔记:
- 代码示例使用到 twitch.tv 的链接而不是 nimo.tv,因为“Nimo 损坏了流链接插件”。
- 该示例假定预先知道宽度和高度。
- 该示例使用 Windows 10 进行了测试(执行 streamlink.exe:
r'c:\Program Files (x86)\Streamlink\bin\streamlink.exe'
安装 Streamlink 后)。