简短说明:
当使用 rpyc 将队列创建为远程对象时,对 q.join() 的调用将被阻塞,即使显然队列为空且所有对象都已用 task_done() 标记
上下文:
在数据采集应用程序中,我有一个运行在 Raspberry pi 上的 python 脚本,它从传感器读取数据并将其发送到远程计算机。我的模型是这样的:
- Raspi 正在运行一个经典的 rpyc 服务器。
- 我写了一个模块和类,其中一个函数将创建一个队列,启动一个线程以用数据填充队列并将队列作为返回值返回
- 我的客户端创建到 rpyc 的连接,创建我的采集类的实例,调用“开始采集”来获取队列的句柄,然后尝试从队列中获取数据。
当我在脚本结束时调用 q.join() 时,我正面临僵局
我做了一个简化的例子来说明这个问题。在本地运行时,程序按预期工作。当队列从远程 rpyc 服务器实例化时,它将挂起。我不明白为什么代码会卡住。
如果您有想法或调查/调试的方法,我很高兴收到您的来信!
import rpyc
import queue
import threading
import time
# Test 1: local queue
#q=queue.Queue()
# Test 2: queue through rpyc server
conn=rpyc.classic.connect("localhost",18812)
m=conn.modules["queue"]
q=m.Queue()
# Fill in the queue
for i in range(3):
q.put(i)
# Worker thread that fetches data from queue as soon as data is available
def worker():
while True:
print("Worker thread. Queue size=%d"%q.qsize() )
if not q.empty():
data = q.get(block=False)
print("fetched data : %d" % (data))
time.sleep(1)
q.task_done()
time.sleep(0.1)
# Start worker thread
threading.Thread(target=worker,daemon=True).start()
# Wait until all data has been fetched from queue by worker thread
while not q.empty():
time.sleep(0)
print("Queue size %d" % q.qsize())
print("Queue empty %d" % q.empty())
print("join")
# Queue must be empty before calling join
q.join()
print("Finished")
未注释“test1”的输出:
Worker thread. Queue size=3
fetched data : 0
Worker thread. Queue size=2
fetched data : 1
Worker thread. Queue size=1
fetched data : 2
Queue size 0
Queue empty 1
join
Finished
未注释 Test2 代码的输出:
Worker thread. Queue size=3
fetched data : 0
Worker thread. Queue size=2
fetched data : 1
Worker thread. Queue size=1
fetched data : 2
Queue size 0
Queue empty 1
join
Traceback (most recent call last):
File "scratches\scratch_5.py", line 41, in <module>
q.join()
File "D:\Tools\Python39\lib\site-packages\rpyc\core\netref.py", line 240, in __call__
return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
File "D:\Tools\Python39\lib\site-packages\rpyc\core\netref.py", line 63, in syncreq
return conn.sync_request(handler, proxy, *args)
File "D:\Tools\Python39\lib\site-packages\rpyc\core\protocol.py", line 473, in sync_request
return self.async_request(handler, *args, timeout=timeout).value
File "D:\Tools\Python39\lib\site-packages\rpyc\core\async_.py", line 100, in value
self.wait()
File "D:\Tools\Python39\lib\site-packages\rpyc\core\async_.py", line 49, in wait
raise AsyncResultTimeout("result expired")
TimeoutError: result expired
PS:使用 python 3.9 & rpyc 5.0.1
编辑: 我找到了一种解决方法,但不明白为什么它会起作用......如果我在尝试加入队列之前很好地结束并加入线程,它就会起作用。
在工作函数末尾添加的代码:
if getattr(threading.currentThread(), "stop_request", False) and q.empty():
break
主线程中添加的代码:
# Wait until all data has been fetched from queue by worker thread
while not q.empty():
time.sleep(0)
t.stop_request=True
print("thread join")
t.join()
tadaaam,现在队列加入没有麻烦。但我仍然不明白为什么正在运行的线程会阻止队列加入功能完成。