0

我正在编写一个 Python 脚本,该脚本将使用Autobahn Python托管一个 websockets 服务器并用作中继,将流式传输到服务器的音频发送到 IBM Watson 进行转录,然后将从 Watson 接收到的转录 JSON 数据转发到客户端 -侧浏览器。

以下脚本适用于单个会话,直到 websocket 刷新/重新连接或 IBM Watson 服务超时(在 30 秒不活动后)。当用户刷新浏览器并重新建立 websocket 时,Watson 转录将恢复,但现在返回的转录突然完全乱码。

我认为问题在于 Watson 识别线程 ( recognize_thread) 没有正确关闭,所以我实际上是在创建多个线程来覆盖同一个队列。我已经尝试了各种方法来杀死识别线程但onClose没有成功(因为我启动recognize_thread.start()onConnect()我似乎无法访问recognize_thread.join().kill()事件onClose()中)。我还读到过在 Python 中突然杀死一个线程并不是最好的做法。

最终目标是服务器可以在自己的线程上接受和托管多个同时用户会话,接收多个不同的音频流,将音频存储在他们自己独特的队列实例中,并将音频流式传输到他们自己独特的 Watson 识别实例. 但我是 Python 线程的新手,可以使用一些关于如何进行此操作的指针。

from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For autobahn
import json
from autobahn.twisted.websocket import WebSocketServerProtocol, \
    WebSocketServerFactory
from twisted.internet import reactor

try:
    from Queue import Queue, Full
except ImportError:
    from queue import Queue, Full

###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
# Note: It will discard if the websocket client can't consumme fast enough
# So, increase the max size as per your choice
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q, True, True)

###############################################
#### Prepare Speech to Text Service ########
###############################################

# initialize speech to text service
authenticator = IAMAuthenticator('secretkey')
speech_to_text = SpeechToTextV1(authenticator=authenticator)

# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
    def __init__(self):
        RecognizeCallback.__init__(self)

    def on_transcription(self, transcript):
        # Forward to client
        MyServerProtocol.broadcast_message(transcript)

    def on_connected(self):
        print('Connection was successful')

    def on_error(self, error):
        # Forward to client
        MyServerProtocol.broadcast_message('Error received: {}'.format(error))

    def on_inactivity_timeout(self, error):
        # Forward to client
        MyServerProtocol.broadcast_message('Inactivity timeout: {}'.format(error))

    def on_listening(self):
        print('Service is listening')

    def on_hypothesis(self, hypothesis):
        # Forward to client
        MyServerProtocol.broadcast_message(hypothesis)

    def on_data(self, data):
        # Forward to client
        MyServerProtocol.broadcast_message(data)

    def on_close(self):
        print("Connection closed")
        MyServerProtocol.broadcast_message("Connection closed")

class MyServerProtocol(WebSocketServerProtocol):
    connections = list()

    def onConnect(self, request):
        print("Client connecting: {0}".format(request.peer))
        self.connections.append(self)
        # Start recognizer on connection
        recognize_thread = Thread(target=recognize_using_weboscket, args=())
        recognize_thread.daemon = True
        recognize_thread.start()

    def onOpen(self):
        print("WebSocket connection open.")

    def onMessage(self, payload, isBinary):
        if isBinary:
            # Put incoming audio into the queue
            try:
                q.put(payload)
            except Full:
                pass # discard
        else:
            print("Text message received: {0}".format(payload.decode('utf8')))

    @classmethod
    def broadcast_message(cls, data):
        payload = json.dumps(data, ensure_ascii = False).encode('utf8')
        for c in set(cls.connections):
            reactor.callFromThread(cls.sendMessage, c, payload)
      

    def onClose(self, wasClean, code, reason):
        print("WebSocket connection closed: {0}".format(reason))
        self.connections.remove(self)
  
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
    mycallback = MyRecognizeCallback()
    speech_to_text.recognize_using_websocket(audio=audio_source,
                                            content_type='audio/l16; rate=16000',
                                            recognize_callback=mycallback,
                                            interim_results=True)

if __name__ == '__main__':

factory = WebSocketServerFactory("ws://127.0.0.1:9001")
    factory.protocol = MyServerProtocol

    reactor.listenTCP(9001, factory)
    reactor.run()
4

0 回答 0