0

我目前遇到关于在 python3 中使用文本/事件流延迟响应的问题。

我遇到的问题是 message inconsistencies。从服务器发送的消息并不总是被客户端接收。

我正在使用 POSTMAN 通过发送连续消息将测试数据发送到服务器。有时客户端会收到我发送的消息,然后在暂停几秒钟后,客户端将不再收到我发送的任何消息,然后过一会儿又开始接收。

这是我目前正在使用的示例代码。

sse.py

from typing import Iterator
import random
import string

from collections import deque
from flask import Response, request
from gevent.queue import Queue
import gevent

def generate_id(size=6, chars=string.ascii_lowercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


class ServerSentEvent(object):
    """Class to handle server-sent events."""
    def __init__(self, data, event):
        self.data = data
        self.event = event
        self.event_id = generate_id(),
        self.retry = 5000
        self.desc_map = {
            self.data: "data",
            self.event: "event",
            self.event_id: "id",
            self.retry: 5000
        }

    def encode(self) -> str:
        """Encodes events as a string."""
        if not self.data:
            return ""
        lines = ["{}: {}".format(name, key)
                 for key, name in self.desc_map.items() if key]

        return "{}\n\n".format("\n".join(lines))


class Channel(object):
    def __init__(self, history_size=32):
        self.subscriptions = []
        self.history = deque(maxlen=history_size)
        self.history.append(ServerSentEvent('start_of_history', None))

    def notify(self, message):
        """Notify all subscribers with message."""
        for sub in self.subscriptions[:]:
            sub.put(message)

    def event_generator(self, last_id) -> Iterator[ServerSentEvent]:
        """Yields encoded ServerSentEvents."""
        q = Queue()
        self._add_history(q, last_id)
        self.subscriptions.append(q)
        try:
            while True:
                yield q.get()
        except GeneratorExit:
            self.subscriptions.remove(q)

    def subscribe(self):
        def gen(last_id) -> Iterator[str]:
            for sse in self.event_generator(last_id):
                yield sse.encode()
        return Response(
            gen(request.headers.get('Last-Event-ID')),
            mimetype="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "Content-Type": "text/event-stream"
            })

    def _add_history(self, q, last_id):
        add = False
        for sse in self.history:
            if add:
                q.put(sse)
            if sse.event_id == last_id:
                add = True

    def publish(self, message, event=None):
        sse = ServerSentEvent(str(message), event)
        self.history.append(sse)
        gevent.spawn(self.notify, sse)

    def get_last_id(self) -> str:
        return self.history[-1].event_id

服务.py

import json
import os

import requests
from app.controllers.sse import Channel
from flask import send_file, \
    jsonify, request, Blueprint, Response
from typing import Iterator

blueprint = Blueprint(__name__, __name__, url_prefix='')
flask_channel = Channel()

@blueprint.route("/stream")
def stream():
    return flask_channel.subscribe()


@blueprint.route('/sample/create', methods=['GET'])
def sample_create():
    branch_id = request.args.get('branch_id', None)
    params = request.get_json()
    if not params:
        params = {
            'id': 'sample_id',
            'description': 'sample_description'
        }
    flask_channel.publish(json.dumps(params), event=branch_id)
    return jsonify({'success': True}), 200

我想知道是否有更好的解决方案。我将不胜感激任何帮助或想法。

4

0 回答 0