0

我的问题如下:我编写了一个订阅一个主题的程序,其中两个具有一个键的字典分别每秒到达更多次。在每条消息上,他们都会改变他们的价值。我将这些字典保存在一个名为“Status”的大缓冲区字典中。我需要的是每秒将状态的“快照”保存到文件中。

我试过 time.sleep(1) 但它漂移了。由于已经存在客户端循环,我不知道如何处理时间表问题......

我对 python 和 mqtt 很陌生,非常感谢你的帮助

我的代码:

import paho.mqtt.client as mqtt
import time
import json

Status = {}

#create client instance
client = mqtt.Client(client_id=None, clean_session=True, transport="tcp")

#connect to broker
client.connect("my_broker", 1883)

#use subscribe() to subscribe to a topic and receive messages
client.subscribe("topic/#", qos=0)

def test1_callback(client, userdata, msg):
    msg_dict = json.loads((msg.payload))
    Status.update(msg_dict)

client.message_callback_add("topic/test1", test1_callback)

while True:
    client.loop_start()
    time.sleep(1)
    client.loop_stop()

    with open('Data.txt', 'a+') as file:
        t = time.localtime()
        Status["time"]= time.strftime("%H:%M:%S", t)
            file.write(str(Status["time"]) + " ")
            file.write(str(Status["key1"]) + " ")
            file.write(str(Status["key2"]) + " ")

    client.loop_start()
4

1 回答 1

0

我宁愿使用每秒触发的计时器,而不是手动停止网络线程。此外,在将数据存储到文件时锁定数据可能是个好主意 - 否则可能会在两者之间发生更新:

# ...
import threading

def test1_callback(client, userdata, msg):
   msg_dict = json.loads((msg.payload))
   lock.acquire()
   Status.update(msg_dict)
   lock.release()

def timer_event():
   lock.acquire()
   # save to file here
   lock.release()
   # restart timer
   threading.Timer(1, timer_event).start()

Status = {}
lock = threading.Lock()

# client initialization
# ...

client.loop_start()
threading.Timer(1, timer_event).start()

while True:
   pass

但这不会阻止您的存储值消失,因为该主题显然发布得太频繁了,因此您的订阅者(甚至是代理)无法足够快地处理消息。

因此,您可能希望缩短发布此主题的时间间隔。另请注意,您订阅了多级主题 - 即使除此之外的主题"topic/test1"未在您的代码中处理,它们仍然会导致代理和订阅客户端的负载

于 2020-08-10T12:07:20.197 回答