我有一个连接到 MQTT 的应用程序,我想发布 1200 个设备,每个设备的 id 作为元数据。以下是代码
"use-strict"
const RSVP = require('rsvp');
const Mqtt = require('mqtt');
const cls = require('cls-hooked');
const namespace = "firstName";
let clsNamespace;
let client = Mqtt.connect("alis://test.mosquitto.org");
if (!client) {
logger.Error("Test", 'Init', 'No mqtt client provided');
throw new extError('No mqtt client created');
}
client.on('connect', async () => {
console.log("Connected");
try {
clsNamespace = cls.createNamespace(namespace);
main();
} catch (error) {
console.log(error);
}
});
function main() {
var devices = [];
for (var i = 0; i < 1200; i++) {
devices.push({ "id": i });
}
RSVP.all(devices.map(async (item) => await updateDevice(item)));
}
async function updateDevice(device) {
try {
return await wrapContext(clsNamespace, async () => {
setContext({ device: device.id });
console.log("update " + device.id + " metadata =" + JSON.stringify(__getMetadata()));
return publish("message", device.id);
});
} catch (error) {
console.log(error);
}
}
function setContext(context) {
try {
let ctxKeys = clsNamespace.get('contextKeys') ? clsNamespace.get('contextKeys') : [];
for (const key in context) {
clsNamespace.set(key, context[key]);
if (ctxKeys.indexOf(key) === -1) {
ctxKeys.push(key);
}
}
clsNamespace.set('contextKeys', ctxKeys);
} catch (error) {
console.error(error);
console.log('cannot set context', context);
throw error;
}
}
function publish(message, deviceId) {
return new RSVP.Promise((resolve, reject) => {
try {
client.publish(message,
deviceId,
(error) => {
if (error) {
console.log("error")
reject(error);
} else {
console.log("publish " + deviceId + " metadata" + JSON.stringify(__getMetadata()));
resolve();
}
});
} catch (error) {
console.log(error);
}
});
}
async function wrapContext(cls, callback) {
let defer = RSVP.defer();
let context = await cls.run(async (contextObj) => {
try {
let result = await callback(contextObj);
defer.resolve(result);
} catch (error) {
defer.reject(error);
}
});
return defer.promise;
};
function __getMetadata() {
const metadata = {};
let contextData = {};
for (const key of clsNamespace.get('contextKeys') || []) {
contextData[key] = clsNamespace.get(key);
}
for (const key in contextData) {
metadata[key] = contextData[key];
}
return metadata;
}
输出如下:
update 0 metadata ={"device":0}
publish 0 metadata{"device":0}
update 1 metadata ={"device":1}
publish 1 metadata{"device":1}
... (same thing for 1165 devices)
update 1166 metadata ={"device":1166}
update 1167 metadata ={"device":1167}
update 1168 metadata ={"device":1168}
update 1169 metadata ={"device":1169}
... (same thing until 1199)
update 1199 metadata ={"device":1199}
publish 1166 metadata{"device":1199}
publish 1167 metadata{"device":1199}
publish 1168 metadata{"device":1199}
... (same thing until 1199)
如您所见,1165 首次发布日志的元数据是正确的,但是一旦迭代中断并且函数变得异步,则首次发布的元数据将不匹配。
有没有办法来解决这个问题?