我有一个 TCP 服务器正在运行,并且有一个处理函数,它需要获取请求的内容,将其添加到 asyncio 队列并以 OK 状态回复。
在后台,我运行了一个异步协程,它检测何时添加新项目并执行一些处理。
如何将项目从处理程序函数放入异步队列中,这不是也不能是异步协程?
我正在运行一个 DICOM 服务器pynetdicom,它在端口 104 上侦听传入的 TCP 请求(C-STORE
特别是 DICOM)。
我需要将请求的内容保存到队列并返回0x0000
响应,以便侦听器可用于网络。
这是由生产者-消费者模式建模的。
由于无法正确定义生产者,我试图定义一个consume_dicom()
当前卡住的消费者协程。await queue.get()
生产者需要简单地调用queue.put(produce_item)
,但这发生在一个handle_store(event)
函数内部,该函数不是该函数的一部分,event_loop
而是在每次服务器接收到请求时调用。
import asyncio
from pynetdicom import (
AE, evt,
StoragePresentationContexts
)
class PacsServer():
def __init__(self, par, listen=True):
# Initialize other stuff...
# Initialize DICOM server
ae = AE(ae_title='DICOM-NODE')
ae.supported_contexts = StoragePresentationContexts
# When a C-STORE request comes, it will be passed to self.handle_store
handlers = [(evt.EVT_C_STORE, self.handle_store)]
# Define queue
loop = asyncio.get_event_loop()
self.queue = asyncio.Queue(loop=loop)
# Define consumer
loop.create_task(self.consume_dicom(self.queue))
# Start server in the background with specified handlers
self.scp = ae.start_server(('', 104), block=False, evt_handlers=handlers)
# Start async loop
self.loop.run_forever()
def handle_store(self, event):
# Request handling
ds = event.dataset
# Here I want to add to the queue but this is not an async method
await queue.put(ds)
return 0x0000
async def consume_dicom(self, queue):
while True:
print(f"AWAITING FROM QUEUE")
ds = await queue.get()
do_some_processing(ds)
我想找到一种将项目添加到队列并在handle_store()
函数中返回 OK 状态的方法。