1

我有一个 TCP 服务器正在运行,并且有一个处理函数,它需要获取请求的内容,将其添加到 asyncio 队列并以 OK 状态回复。

在后台,我运行了一个异步协程,它检测何时添加新项目并执行一些处理。

如何将项目从处理程序函数放入异步队列中,这不是也不能是异步协程?

我正在运行一个 DICOM 服务器pynetdicom,它在端口 104 上侦听传入的 TCP 请求(C-STORE特别是 DICOM)。

我需要将请求的内容保存到队列并返回0x0000响应,以便侦听器可用于网络。

这是由生产者-消费者模式建模的。

由于无法正确定义生产者,我试图定义一个consume_dicom()当前卡住的消费者协程。await queue.get()

生产者需要简单地调用queue.put(produce_item),但这发生在一个handle_store(event)函数内部,该函数不是该函数的一部分,event_loop而是在每次服务器接收到请求时调用。

import asyncio

from pynetdicom import (
    AE, evt,
    StoragePresentationContexts
)

class PacsServer():
    def __init__(self, par, listen=True):
        # Initialize other stuff...

        # Initialize DICOM server
        ae = AE(ae_title='DICOM-NODE')
        ae.supported_contexts = StoragePresentationContexts

        # When a C-STORE request comes, it will be passed to self.handle_store
        handlers = [(evt.EVT_C_STORE, self.handle_store)]

        # Define queue
        loop = asyncio.get_event_loop()
        self.queue = asyncio.Queue(loop=loop)

        # Define consumer
        loop.create_task(self.consume_dicom(self.queue))

        # Start server in the background with specified handlers
        self.scp = ae.start_server(('', 104), block=False, evt_handlers=handlers)

        # Start async loop
        self.loop.run_forever()



    def handle_store(self, event):
        # Request handling
        ds = event.dataset

        # Here I want to add to the queue but this is not an async method
        await queue.put(ds)

        return 0x0000


    async def consume_dicom(self, queue):
        while True:
            print(f"AWAITING FROM QUEUE")
            ds = await queue.get()

            do_some_processing(ds)

我想找到一种将项目添加到队列并在handle_store()函数中返回 OK 状态的方法。

4

1 回答 1

1

由于handle_store在不同的线程中运行,它需要告诉事件循环将项目入队。这是通过以下方式完成的call_soon_threadsafe

self.loop.call_soon_threadsafe(queue.put_nowait, ds)

请注意,您需要调用queue.put_nowait而不是queue.put因为前者是函数而不是协程。对于无界队列(默认),该函数将始终成功,否则如果队列已满,它将引发异常。

于 2019-09-09T12:11:55.710 回答