1

我是 pythongatt模块的新手,我遇到了重新连接的问题。基本上我要做的是使用pythongatt模块(https://github.com/getsenic/gatt-python)建立与蓝牙低功耗(BLE)设备的连接,然后使用模块从/dev/input/eventX路径中读取输入evdev. 我还想自动化重新连接过程,因此当设备超出范围并返回时,它将重新连接并继续正常工作。

问题是,当设备断开连接并最终重新连接时(通过像这样的简单例程:catch disconnect message -> try to reconnect),如果重新连接时间超过 2-3 分钟,则连接过程不会创建新/dev/input/eventX路径。如果在前 1-2 分钟之间重新连接成功,则不会发生这种情况。

2-3分钟过去后我得到的错误是:

文件“/usr/lib/python3.7/site-packages/dbus/proxies.py”,第 145 行, 调用 文件“/usr/lib/python3.7/site-packages/dbus/connection.py”,行651,在 call_blocking dbus.exceptions.DBusException: org.freedesktop.DBus.Error.NoReply: 没有收到回复。可能的原因包括:远程应用程序未发送回复、消息总线安全策略阻止回复、回复超时或网络连接中断。

该脚本的核心如下:

def reconnect(mac_address):
    try:
        devices[mac_address].connect()
    except:
        print(f"thread from {mac_address} crashed")

class AnyDevice(gatt.Device):
    shut_down_flag = False


    def connect_succeeded(self):
        super().connect_succeeded()
        print(f"{self.mac_address} Connected")

    def connect_failed(self, error):
        super().connect_failed(error)
        print(f"{self.mac_address} Connection failed.")
        reconnect_thread = threading.Thread(target=reconnect, name=f'reconnect {self.mac_address}',args=(self.mac_address,))
        reconnect_thread.start()

    def disconnect_succeeded(self):
        super().disconnect_succeeded()
        print(f"{self.mac_address} Disconnected")
        if not self.shut_down_flag:
            reconnect_thread = threading.Thread(target=reconnect, name=f'reconnect {self.mac_address}',args=(self.mac_address,))
            reconnect_thread.start()

def gatt_connect_device(mac_address):
    global devices
    devices.update({f'{mac_address}': AnyDevice(mac_address=f'{mac_address}', manager=manager)})
    devices[f'{mac_address}'].connect()


#==== OPEN bd_addresses.txt JSON FILE ====#
if path.exists("bd_addresses.txt"):
    with open("bd_addresses.txt", "r") as mac_addresses_json:
        mac_addresses = json.load(mac_addresses_json)
else:
    print("bd_addresses.txt file NOT FOUND\nPlace it in the same directory as the multiple_scanners.py")
#========================================#



devices={}
manager = gatt.DeviceManager(adapter_name='hci0')

for scanner_number in mac_addresses:
    device_instance_thread=threading.Thread(target=gatt_connect_device, name=f'device instance for {mac_addresses[scanner_number]}', args=(mac_addresses[scanner_number],))
    device_instance_thread.start()
    time.sleep(3)



manager.run()
4

0 回答 0