在我收到带有数据的帖子后,我认为我将其发送到 gcp pub/sub 主题。我有 websockets 我想读取之前发送的数据:
@sockets.route('/ws/<string:_id>/some_url/')
def some_function(ws, _id):
if mine_id not in config.MINES:
ws.close()
while not ws.closed:
ws.send(json.dumps({
'type': 'some type',
'data': get_data(_id, 'some type')
}))
time.sleep(100)
它从 generics.ListAPIView 获取数据
def get_data(_id, data_type):
if data_type in config.URLS:
url = data_type
else:
return {'error': 'URL is wrong'}
try:
response = requests.get(
'/'.join([config.API_URL, f"{_id}/" + url + "/"]),
headers={'Authorization': ' '.join(['Token', config.TOKEN])}
)
return response.json()
except HTTPError:
return {'error': 'HTTPError'}
except RequestException:
return {'error': 'RequestException'}
这是消费者代码:
def consume():
try:
project_id = 'some project'
subscription_name = 'some subscription'
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
print('Received message: {}'.format(message.data))
if message.attributes:
print('Attributes:')
for key in message.attributes:
value = message.attributes.get(key)
print('{}: {}'.format(key, value))
message.ack()
future = subscriber.subscribe(
subscription_path, callback=callback)
try:
data = future.result()
except Exception as e:
logger.exception(
'Listening for some type messages on {} threw an Exception: {}.'.format(
subscription_name, e))
except ConnectionError:
return {'error': 'ConnectionError'}
return data
我已经尝试过 websockets 和 producer\consumer ,它们都是独立工作的。我怎样才能让他们成为朋友,并在 ws.send 调用中使用 consume() 或从它返回的数据?