我是 pythongatt
模块的新手,我遇到了重新连接的问题。基本上我要做的是使用pythongatt
模块(https://github.com/getsenic/gatt-python)建立与蓝牙低功耗(BLE)设备的连接,然后使用模块从/dev/input/eventX
路径中读取输入evdev
. 我还想自动化重新连接过程,因此当设备超出范围并返回时,它将重新连接并继续正常工作。
问题是,当设备断开连接并最终重新连接时(通过像这样的简单例程:catch disconnect message -> try to reconnect),如果重新连接时间超过 2-3 分钟,则连接过程不会创建新/dev/input/eventX
路径。如果在前 1-2 分钟之间重新连接成功,则不会发生这种情况。
2-3分钟过去后我得到的错误是:
文件“/usr/lib/python3.7/site-packages/dbus/proxies.py”,第 145 行, 调用 文件“/usr/lib/python3.7/site-packages/dbus/connection.py”,行651,在 call_blocking dbus.exceptions.DBusException: org.freedesktop.DBus.Error.NoReply: 没有收到回复。可能的原因包括:远程应用程序未发送回复、消息总线安全策略阻止回复、回复超时或网络连接中断。
该脚本的核心如下:
def reconnect(mac_address):
try:
devices[mac_address].connect()
except:
print(f"thread from {mac_address} crashed")
class AnyDevice(gatt.Device):
shut_down_flag = False
def connect_succeeded(self):
super().connect_succeeded()
print(f"{self.mac_address} Connected")
def connect_failed(self, error):
super().connect_failed(error)
print(f"{self.mac_address} Connection failed.")
reconnect_thread = threading.Thread(target=reconnect, name=f'reconnect {self.mac_address}',args=(self.mac_address,))
reconnect_thread.start()
def disconnect_succeeded(self):
super().disconnect_succeeded()
print(f"{self.mac_address} Disconnected")
if not self.shut_down_flag:
reconnect_thread = threading.Thread(target=reconnect, name=f'reconnect {self.mac_address}',args=(self.mac_address,))
reconnect_thread.start()
def gatt_connect_device(mac_address):
global devices
devices.update({f'{mac_address}': AnyDevice(mac_address=f'{mac_address}', manager=manager)})
devices[f'{mac_address}'].connect()
#==== OPEN bd_addresses.txt JSON FILE ====#
if path.exists("bd_addresses.txt"):
with open("bd_addresses.txt", "r") as mac_addresses_json:
mac_addresses = json.load(mac_addresses_json)
else:
print("bd_addresses.txt file NOT FOUND\nPlace it in the same directory as the multiple_scanners.py")
#========================================#
devices={}
manager = gatt.DeviceManager(adapter_name='hci0')
for scanner_number in mac_addresses:
device_instance_thread=threading.Thread(target=gatt_connect_device, name=f'device instance for {mac_addresses[scanner_number]}', args=(mac_addresses[scanner_number],))
device_instance_thread.start()
time.sleep(3)
manager.run()