2

我正在编写一个 Python 程序来与基于 CAN 总线的设备进行交互。为此,我成功地使用了 python-can 模块。我还使用 asyncio 来响应异步事件。我编写了一个“CanBusSequencer”类使用的“CanBusManager”类。“CanBusManager”类负责生成/发送/接收消息,CanBusSequencer 驱动要发送的消息序列。
在序列中的某个时刻,我想等到收到特定消息以“解锁”序列中要发送的剩余消息。代码概述:

主文件

async def main():
   
   event = asyncio.Event()
   sequencer = CanBusSequencer(event)
   task = asyncio.create_task(sequencer.doSequence())
   await task
 
asyncio.run(main(), debug=True)

canBusSequencer.py

from canBusManager import CanBusManager

class CanBusSequencer:
 
   def __init__(self, event)
 
      self.event = event
      self.canManager = CanBusManager(event)

   async def doSequence(self):
 
      for index, row in self.df_sequence.iterrows():
         if:...
            self.canManager.sendMsg(...)
         else:
            self.canManager.sendMsg(...)
            await self.event.wait()
            self.event.clear()

canBusManager.py

import can

class CanBusManager():
 
   def __init__(self, event):
 
      self.event = event
      self.startListening()
 
 **EDIT**
    def startListening(self):
    
       self.msgNotifier = can.Notifier(self.canBus, self.receivedMsgCallback)
 **EDIT**
 
   def receivedMsgCallback(self, msg):
 
      if(msg == ...):
         self.event.set()
   

现在我的程序仍然等待 self.event.wait(),即使收到了相关消息并执行了 self.event.set()。使用 debug = True 运行程序会显示一个

RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

我真的不明白。它与异步事件循环有关,不知何故没有正确定义/管理。我来自 C++ 世界,目前正在用 Python 编写我的第一个大型程序。任何指导将不胜感激:)

4

1 回答 1

1

您的问题没有解释您如何安排receivedMsgCallback被调用。

如果它是由使用幕后线程的经典“异步”API 调用的,那么它将从运行事件循环的线程外部调用。根据文档,异步原语不是线程安全的,因此从另一个线程调用event.set()无法与正在运行的事件循环正确同步,这就是为什么您的程序没有在应该唤醒的时候唤醒。

如果你想从事件循环线程之外做任何与异步相关的事情,例如调用Event.set,你需要使用call_soon_threadsafe或等效。例如:

    def receivedMsgCallback(self, msg):
        if msg == ...:
            self.loop.call_soon_threadsafe(self.event.set)

事件循环对象应该对CanBusManager对象可用,可能通过将其传递给其构造函数并将其分配给self.loop.

附带说明一下,如果您创建一个任务只是为了立即等待它,那么您首先不需要任务。换句话说,您可以task = asyncio.create_task(sequencer.doSequence()); await task用更简单的await sequencer.doSequence().

于 2020-07-09T08:39:56.030 回答