我研究 MQTT 协议已经有一段时间了。该协议将被组织用于分别向每个客户端发送确认消息。
在用例中,只有 1 个“发布者”客户端向订阅其独特主题的“订阅者”客户端发布确认消息
为了确保代理基本上没有错误并且将来可以轻松扩展,我一直在尝试通过尝试将至少 50,000 个客户端连接到它们来测试 Emqx 和 Vernemq 开源代理。
但是,我无法创建那么多连接。我在 Google Cloud 上的 Ubuntu 18.04 实例(8 核 CPU,15 GB RAM)在大约 300-400 之后无法建立任何更成功的连接。
我尝试进行以下更改: ulimit -n 64500 (用于允许这么多文件描述符,因为每个套接字连接都需要一个文件描述符)
请帮助我建立超过 50,000 个连接。我应该运行“n”个线程并在每个线程的循环下运行 total_clients/total_threads 客户端吗?
还是应该为每个客户端连接创建一个线程?
我应该怎么办?
一旦客户端开始断开连接,即使客户端没有发送断开数据包,以下消息也会出现在“$SYS/#”主题上。$SYS/brokers/emqx@127.0.0.1/clients/112/disconnected {"clientid":"112","username":"undefined","reason":"close","ts":1536587647}
import paho.mqtt.client as mqtt
from threading import Lock
import time
print_lock = Lock()
def s_print(str):
print_lock.acquire()
print(str)
print_lock.release()
def on_connect(client, userdata, flags, rc):
client_id = userdata["client_id"]
if (rc == 0):
s_print("connected " + client_id)
client.subscribe(client_id, 2)
def on_disconnect(client, userdata, rc):
client_id = userdata["client_id"]
s_print("disconnected: " + client_id + " reason " + str(rc))
def on_message(client, userdata, message):
topic = message.topic
payload = str(message.payload)
s_print("Recieved " + payload + " on " + topic)
if __name__ == '__main__':
n_clients = int(input("Enter no. of clients: "))
for i in range(n_clients):
client_id = str(i)
s_print(client_id)
userdata = {
"client_id" : client_id
}
client = mqtt.Client(client_id=client_id, clean_session=True, userdata=userdata)
client.on_connect = on_connect
client.on_disconnect =on_disconnect
client.on_message = on_message
client.connect("35.228.57.228", 1883, 60)
client.loop_start()
time.sleep(0.5)
while(1):
time.sleep(5)