我从OpenCV VideoCapture.read()捕获视频帧并将帧发送到WebSocket服务器(Twisted with Autobahn WebSocket API),我还使用 Twisted IPushProducer 接口将数据流式传输到 WebSocket 并最终在发回时清理相机给客户。
这是我的代码。
服务器.py
import cv2
import cv2.cv as cv
import numpy as np
from autobahn.twisted.websocket import WebSocketServerProtocol, \
WebSocketServerFactory, \
listenWS
from VideoStreamClient import BATH_SIZE
class VideoStreamServerProtocol(WebSocketServerProtocol):
def onConnect(self,request):
print("Client connecting: {0}".format(request.peer))
def onOpen(self):
print("WebSocket connection open.")
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))
def onMessageBegin(self, isBinary):
WebSocketServerProtocol.onMessageBegin(self, isBinary)
def onMessageFrameBegin(self, length):
WebSocketServerProtocol.onMessageFrameBegin(self, length)
self.received = 0
self.next = BATCH_SIZE
def onMessageFrameData(self, payload):
self.received += len(payload)
if self.received >= self.next:
self.sendMessageFrameData(payload,isBinary=True)
self.received = 0;
def onMessageFrameEnd(self):
pass
def onMessageEnd(self):
pass
class VideoStreamServerFactory(WebSocketServerFactory):
protocol = VideoStreamServerProtocol
def __init__(self):
WebSocketServerFactory.__init__(self,"ws://localhost:9000", debug = False)
if __name__ == '__main__':
import sys
from twisted.python import log
from twisted.internet import reactor
log.startLogging(sys.stdout)
factory = VideoStreamServerFactory()
listenWS(factory)
reactor.run()
客户端.py
from autobahn.twisted.websocket import WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS
from zope.interface import implementer
from twisted.internet import reactor, interfaces
import cv2
import numpy as np
FRAME_SIZE = 0x7FFFFFFFFFFFFFFF
BATCH_SIZE = 1 * 2**20
@implementer(interfaces.IPushProducer)
class VideoStreamProducer:
def __init__(self,proto):
self.proto = proto
self.started = False
self.paused = False
def pauseProducing(self):
self.paused = True
def resumeProducing(self):
self.paused = False
if not self.started:
self.cap = cv2.VideoCapture(0)
self.cap.set(cv2.cv.CV_CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT, 480)
self.proto.beginMessage(isBinary=True)
self.proto.beginMessageFrame(FRAME_SIZE)
self.started = True
while not self.paused:
isSuccess, frame = self.cap.read()
_, data = cv2.imencode(".jpg",frame,encode_param)
if isSuccess:
if self.proto.sendMessageFrameData(data)<=0:
self.proto.beginMessageFrame(FRAME_SIZE)
def stopProducing(self):
self.cap.release()
class VideoStreamClientProtocol(WebSocketClientProtocol):
def onConnect(self,response):
pass
def onOpen(self):
producer = VideoStreamProducer(self)
self.registerProducer(producer, True)
producer.resumeProducing()
def onMessage(self, payload, isBinary):
print(len(payload))
if __name__ == '__main__':
factory = WebSocketClientFactory("ws://localhost:9000")
factory.protocol = VideoStreamClientProtocol
connectWS(factory)
reactor.run()