我们有一个 Flask 应用程序,它需要向 VernqMQ 代理上的不同订阅发送消息。我们使用 paho.mqtt 库连接到代理,并且我们使用基于 JWT 的身份验证,该身份验证通过自定义 webhook 脚本验证代理。但是,JWT 每 7 天过期一次,因此 Flask 应用程序需要更新该令牌并重新连接。我们目前正在 on_disconnect 事件中处理这个问题。此设置有效,但是,一段时间后,我们开始收到此错误消息:
can't register client {xxxx} with username 'xxxx' due to register_subscriber_retry_exhausted
该消息在代理的日志文件中出现了数百次,但我们找不到有关此错误的任何信息。以下是我们如何在 Flask 中建立/更新 MQTT 代理连接的代码:
def mqtt_on_connect(client, userdata, flags, rc):
if rc == 0:
print("MQTT broker Connection successful")
else:
print("MQTT broker Connection attempt failed.")
def mqtt_on_disconnect(client, userdata, rc):
token = crypto.get_jwt('our_username').decode()
client.username_pw_set("our_username", password=f'Bearer {token}')
try:
client.connect(
app.config['MQTT_BROKER_URL'],
port=app.config['MQTT_BROKER_PORT']
)
except Exception as ex:
print("Could not reconnect to broker.")
print(ex)
def establish_mqtt_connection(app, get_jwt):
token = get_jwt(
'our_username'
).decode()
client_id = str(uuid.uuid4())
if app.config['ENVIRONMENT'] not in {'LOCAL', 'TEST'}:
client_id = str(uuid.uuid4())
mqtt_client = mqtt.Client(
f'control_{client_id}',
clean_session=True,
transport="websockets"
)
mqtt_client.tls_set(
ca_certs=certifi.where(),
tls_version=ssl.PROTOCOL_TLSv1_2
)
mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_disconnect = mqtt_on_disconnect
mqtt_client.username_pw_set("our_username", password=f'Bearer {token}')
host = app.config['MQTT_BROKER_URL']
mqtt_client._host = host
mqtt_client.loop_start()
try:
mqtt_client.connect(
app.config['MQTT_BROKER_URL'],
port=app.config['MQTT_BROKER_PORT'],
)
except Exception as ex:
print(f'Exception establishing connection to MQTT broker: {ex}')
app.config['MQTT_CLIENT'] = mqtt_client
为什么会出现此错误消息,我们可以做些什么来解决它?我们如何设置代理连接有问题吗?任何有关此的信息将不胜感激。
附加信息:代理是 VerneMQ 的单个实例,而不是集群。所有配置值都设置为默认值。