0

简短说明
当使用 rpyc 将队列创建为远程对象时,对 q.join() 的调用将被阻塞,即使显然队列为空且所有对象都已用 task_done() 标记

上下文
在数据采集应用程序中,我有一个运行在 Raspberry pi 上的 python 脚本,它从传感器读取数据并将其发送到远程计算机。我的模型是这样的:

  • Raspi 正在运行一个经典的 rpyc 服务器。
  • 我写了一个模块和类,其中一个函数将创建一个队列,启动一个线程以用数据填充队列并将队列作为返回值返回
  • 我的客户端创建到 rpyc 的连接,创建我的采集类的实例,调用“开始采集”来获取队列的句柄,然后尝试从队列中获取数据。

当我在脚本结束时调用 q.join() 时,我正面临僵局

我做了一个简化的例子来说明这个问题。在本地运行时,程序按预期工作。当队列从远程 rpyc 服务器实例化时,它将挂起。我不明白为什么代码会卡住。

如果您有想法或调查/调试的方法,我很高兴收到您的来信!

import rpyc
import queue
import threading
import time

# Test 1: local queue
#q=queue.Queue()

# Test 2: queue through rpyc server
conn=rpyc.classic.connect("localhost",18812)
m=conn.modules["queue"]
q=m.Queue()

# Fill in the queue
for i in range(3):
    q.put(i)

# Worker thread that fetches data from queue as soon as data is available
def worker():
    while True:
        print("Worker thread. Queue size=%d"%q.qsize() )
        if not q.empty():
            data = q.get(block=False)
            print("fetched data : %d" % (data))
            time.sleep(1)
            q.task_done()
        time.sleep(0.1)

# Start worker thread
threading.Thread(target=worker,daemon=True).start()

# Wait until all data has been fetched from queue by worker thread
while not q.empty():
    time.sleep(0)

print("Queue size %d" % q.qsize())
print("Queue empty %d" % q.empty())

print("join")
# Queue must be empty before calling join
q.join()

print("Finished")

未注释“test1”的输出:

Worker thread. Queue size=3
fetched data : 0
Worker thread. Queue size=2
fetched data : 1
Worker thread. Queue size=1
fetched data : 2
Queue size 0
Queue empty 1
join
Finished

未注释 Test2 代码的输出:

Worker thread. Queue size=3
fetched data : 0
Worker thread. Queue size=2
fetched data : 1
Worker thread. Queue size=1
fetched data : 2
Queue size 0
Queue empty 1
join
Traceback (most recent call last):
  File "scratches\scratch_5.py", line 41, in <module>
    q.join()
  File "D:\Tools\Python39\lib\site-packages\rpyc\core\netref.py", line 240, in __call__
    return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
  File "D:\Tools\Python39\lib\site-packages\rpyc\core\netref.py", line 63, in syncreq
    return conn.sync_request(handler, proxy, *args)
  File "D:\Tools\Python39\lib\site-packages\rpyc\core\protocol.py", line 473, in sync_request
    return self.async_request(handler, *args, timeout=timeout).value
  File "D:\Tools\Python39\lib\site-packages\rpyc\core\async_.py", line 100, in value
    self.wait()
  File "D:\Tools\Python39\lib\site-packages\rpyc\core\async_.py", line 49, in wait
    raise AsyncResultTimeout("result expired")
TimeoutError: result expired

PS:使用 python 3.9 & rpyc 5.0.1

编辑: 我找到了一种解决方法,但不明白为什么它会起作用......如果我在尝试加入队列之前很好地结束并加入线程,它就会起作用。

在工作函数末尾添加的代码:

        if getattr(threading.currentThread(), "stop_request", False) and q.empty():
            break

主线程中添加的代码:

# Wait until all data has been fetched from queue by worker thread
while not q.empty():
    time.sleep(0)
t.stop_request=True
print("thread join")
t.join()

tadaaam,现在队列加入没有麻烦。但我仍然不明白为什么正在运行的线程会阻止队列加入功能完成。

4

0 回答 0