2

我正在尝试让 NATS 订阅者持续收听已发布的消息。

My Publisher 是一个 API 端点,可以在浏览器中访问。我的订阅者是一个 python 应用程序,它应该永远运行,监听发布的消息。

我的问题是订户从不打印任何东西。如果我将 run_forever() 更改为 loop.close(),它可以工作但会立即关闭。我知道出版商正在工作,因为我可以看到来自 NATS 服务器的打印输出。

我正在 docker-compose 中运行所有内容。

我的订阅者:

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):

await nc.connect("nats://nats:4222", loop=loop)

async def message_handler_A(msg):
    print('fsfdsfdsfdsfdsf')

async def message_handler_B(msg):
    print('fdsfdsfdsfdsf')

async def message_handler_C(msg):
    print('sdfdsfdsf')

await nc.subscribe("message_handler_A", cb=message_handler_A)
await nc.subscribe("message_handler_B", cb=message_handler_B)
await nc.subscribe("message_handler_C", cb=message_handler_C)
print('receiving')


if __name__ == '__main__':
    print("RUNNING")

    nc = NATS()
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop) 


    loop.run_until_complete(run(loop))
    loop.run_forever()

我的发布者:

import connexion
import six
import json
import asyncio
from nats.aio.client import Client as NATS

from swagger_server import util

async def run(loop):
    nc = NATS()
   # [begin publish_json]
    await nc.connect("nats://nats:4222", loop=loop)
    for i in range(10):
      await nc.publish("message_handler_B", b"")
      await nc.publish("message_handler_C", b"")
      await nc.publish("message_handler_A", b"")

def healthz_get():  # noqa: E501

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop) 


    loop.run_until_complete(run(loop))
    loop.close()

    return 'Processing Request'

我的码头工人撰写:

version: '3'

services:
  nats:
    image: 'nats:0.8.0'
    entrypoint: "/gnatsd -DV"
    expose:
      - "4222"
    ports:
      - "4222:4222"
    hostname: nats-server


data_api:
    restart: always
    build: ..\data_api
    image: data_api
    container_name: data_api
    ports:
      - "5022:5022"
  depends_on:
    - "POCpostgres"
    - "queue_app"

queue_app:
    build: ..\queue_app
    image: queue_app
    container_name: queue_app
    ports:
      - "5023:5023"
4

1 回答 1

4

答案是使用 Nats 流媒体服务:STAN:

订户:

import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN

async def run(loop):
    nc = NATS()
    sc = STAN()

    # Start session with NATS Streaming cluster using
    # the established NATS connection.
    await nc.connect(io_loop=loop)
    await sc.connect("test-cluster", "client-123", nats=nc)

    # Example async subscriber
    async def cb(msg):
        print("Received a message (seq={}): {}".format(msg.seq, msg.data))

    # Subscribe to get all messages from the beginning.
    await sc.subscribe("greetings", start_at='first', cb=cb)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

发布者: import connexion import 6 from swagger_server import util

import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN

async def run(loop):
    nc = NATS()
    sc = STAN()

    # First connect to NATS, then start session with NATS Streaming.
    await nc.connect(io_loop=loop)
    await sc.connect("test-cluster", "client-456", nats=nc)

    await sc.publish("greetings", b'Hello World!')
    await nc.flush(1)
    print("sent")

    await sc.close()
    await nc.close()

def healthz_get():  # noqa: E501

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop) 

    loop.run_until_complete(run(loop))
    loop.close()


    return 'Processing Request'
于 2019-08-15T14:59:10.487 回答