我正在尝试让 NATS 订阅者持续收听已发布的消息。
My Publisher 是一个 API 端点,可以在浏览器中访问。我的订阅者是一个 python 应用程序,它应该永远运行,监听发布的消息。
我的问题是订户从不打印任何东西。如果我将 run_forever() 更改为 loop.close(),它可以工作但会立即关闭。我知道出版商正在工作,因为我可以看到来自 NATS 服务器的打印输出。
我正在 docker-compose 中运行所有内容。
我的订阅者:
import asyncio
from nats.aio.client import Client as NATS
async def run(loop):
await nc.connect("nats://nats:4222", loop=loop)
async def message_handler_A(msg):
print('fsfdsfdsfdsfdsf')
async def message_handler_B(msg):
print('fdsfdsfdsfdsf')
async def message_handler_C(msg):
print('sdfdsfdsf')
await nc.subscribe("message_handler_A", cb=message_handler_A)
await nc.subscribe("message_handler_B", cb=message_handler_B)
await nc.subscribe("message_handler_C", cb=message_handler_C)
print('receiving')
if __name__ == '__main__':
print("RUNNING")
nc = NATS()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run(loop))
loop.run_forever()
我的发布者:
import connexion
import six
import json
import asyncio
from nats.aio.client import Client as NATS
from swagger_server import util
async def run(loop):
nc = NATS()
# [begin publish_json]
await nc.connect("nats://nats:4222", loop=loop)
for i in range(10):
await nc.publish("message_handler_B", b"")
await nc.publish("message_handler_C", b"")
await nc.publish("message_handler_A", b"")
def healthz_get(): # noqa: E501
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run(loop))
loop.close()
return 'Processing Request'
我的码头工人撰写:
version: '3'
services:
nats:
image: 'nats:0.8.0'
entrypoint: "/gnatsd -DV"
expose:
- "4222"
ports:
- "4222:4222"
hostname: nats-server
data_api:
restart: always
build: ..\data_api
image: data_api
container_name: data_api
ports:
- "5022:5022"
depends_on:
- "POCpostgres"
- "queue_app"
queue_app:
build: ..\queue_app
image: queue_app
container_name: queue_app
ports:
- "5023:5023"