2

我们有一个 Flask 应用程序,它需要向 VernqMQ 代理上的不同订阅发送消息。我们使用 paho.mqtt 库连接到代理,并且我们使用基于 JWT 的身份验证,该身份验证通过自定义 webhook 脚本验证代理。但是,JWT 每 7 天过期一次,因此 Flask 应用程序需要更新该令牌并重新连接。我们目前正在 on_disconnect 事件中处理这个问题。此设置有效,但是,一段时间后,我们开始收到此错误消息:

can't register client {xxxx} with username 'xxxx' due to register_subscriber_retry_exhausted

该消息在代理的日志文件中出现了数百次,但我们找不到有关此错误的任何信息。以下是我们如何在 Flask 中建立/更新 MQTT 代理连接的代码:

def mqtt_on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("MQTT broker Connection successful")
    else:
        print("MQTT broker Connection attempt failed.")


def mqtt_on_disconnect(client, userdata, rc):
    token = crypto.get_jwt('our_username').decode()
    client.username_pw_set("our_username", password=f'Bearer {token}')
    try:
        client.connect(
            app.config['MQTT_BROKER_URL'],
            port=app.config['MQTT_BROKER_PORT']
        )
    except Exception as ex:
        print("Could not reconnect to broker.")
        print(ex)


def establish_mqtt_connection(app, get_jwt):
    token = get_jwt(
        'our_username'
    ).decode()
    client_id = str(uuid.uuid4())
    if app.config['ENVIRONMENT'] not in {'LOCAL', 'TEST'}:
        client_id = str(uuid.uuid4())
        mqtt_client = mqtt.Client(
        f'control_{client_id}',
        clean_session=True,
        transport="websockets"
    )
    mqtt_client.tls_set(
        ca_certs=certifi.where(),
        tls_version=ssl.PROTOCOL_TLSv1_2
    )
    mqtt_client.on_connect = mqtt_on_connect
    mqtt_client.on_disconnect = mqtt_on_disconnect
    mqtt_client.username_pw_set("our_username", password=f'Bearer {token}')
    host = app.config['MQTT_BROKER_URL']
    mqtt_client._host = host
    mqtt_client.loop_start()
    try:
        mqtt_client.connect(
            app.config['MQTT_BROKER_URL'],
            port=app.config['MQTT_BROKER_PORT'],
        )
    except Exception as ex:
        print(f'Exception establishing connection to MQTT broker: {ex}')

    app.config['MQTT_CLIENT'] = mqtt_client

为什么会出现此错误消息,我们可以做些什么来解决它?我们如何设置代理连接有问题吗?任何有关此的信息将不胜感激。

附加信息:代理是 VerneMQ 的单个实例,而不是集群。所有配置值都设置为默认值。

4

0 回答 0