0

在我收到带有数据的帖子后,我认为我将其发送到 gcp pub/sub 主题。我有 websockets 我想读取之前发送的数据:

@sockets.route('/ws/<string:_id>/some_url/')
def some_function(ws, _id):
    if mine_id not in config.MINES:
        ws.close()
    while not ws.closed:
        ws.send(json.dumps({
            'type': 'some type',
            'data': get_data(_id, 'some type')
        }))
        time.sleep(100)

它从 generics.ListAPIView 获取数据

def get_data(_id, data_type):
    if data_type in config.URLS:
        url = data_type
    else:
        return {'error': 'URL is wrong'}
    try:
        response = requests.get(
            '/'.join([config.API_URL, f"{_id}/" + url + "/"]),
            headers={'Authorization': ' '.join(['Token', config.TOKEN])}
        )
        return response.json()

    except HTTPError:
        return {'error': 'HTTPError'}
    except RequestException:
        return {'error': 'RequestException'}

这是消费者代码:

def consume():
    try:
        project_id = 'some project'
        subscription_name = 'some subscription'

        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(
            project_id, subscription_name)

        def callback(message):
            print('Received message: {}'.format(message.data))
            if message.attributes:
                print('Attributes:')
                for key in message.attributes:
                    value = message.attributes.get(key)
                    print('{}: {}'.format(key, value))
            message.ack()

        future = subscriber.subscribe(
            subscription_path, callback=callback)
        try:
            data = future.result()
        except Exception as e:
            logger.exception(
                'Listening for some type messages on {} threw an Exception: {}.'.format(
                    subscription_name, e))

    except ConnectionError:
        return {'error': 'ConnectionError'}

    return data

我已经尝试过 websockets 和 producer\consumer ,它们都是独立工作的。我怎样才能让他们成为朋友,并在 ws.send 调用中使用 consume() 或从它返回的数据?

4

0 回答 0