1

我有一个连接到 MQTT 的应用程序,我想发布 1200 个设备,每个设备的 id 作为元数据。以下是代码

"use-strict"
const RSVP = require('rsvp');
const Mqtt = require('mqtt');
const cls = require('cls-hooked');

const namespace = "firstName";
let clsNamespace;

let client = Mqtt.connect("alis://test.mosquitto.org");


if (!client) {
    logger.Error("Test", 'Init', 'No mqtt client provided');
    throw new extError('No mqtt client created');
}

client.on('connect', async () => {
    console.log("Connected");
    try {
        clsNamespace = cls.createNamespace(namespace);
        main();
    } catch (error) {
        console.log(error);
    }
});


function main() {
    var devices = [];
    for (var i = 0; i < 1200; i++) {
        devices.push({ "id": i });

    }
    RSVP.all(devices.map(async (item) => await updateDevice(item)));
}

async function updateDevice(device) {
    try {
        return await wrapContext(clsNamespace, async () => {
            setContext({ device: device.id });
            console.log("update " + device.id + " metadata =" + JSON.stringify(__getMetadata()));
            return publish("message", device.id);
        });
    } catch (error) {
        console.log(error);
    }

}

function setContext(context) {
    try {
        let ctxKeys = clsNamespace.get('contextKeys') ? clsNamespace.get('contextKeys') : [];
        for (const key in context) {
            clsNamespace.set(key, context[key]);
            if (ctxKeys.indexOf(key) === -1) {
                ctxKeys.push(key);
            }
        }
        clsNamespace.set('contextKeys', ctxKeys);
    } catch (error) {
        console.error(error);
        console.log('cannot set context', context);
        throw error;
    }
}

function publish(message, deviceId) {
    return new RSVP.Promise((resolve, reject) => {
        try {
            client.publish(message,
                deviceId,
                (error) => {
                    if (error) {
                        console.log("error")
                        reject(error);
                    } else {
                        console.log("publish " + deviceId + " metadata" + JSON.stringify(__getMetadata()));
                        resolve();
                    }
                });
        } catch (error) {
            console.log(error);
        }

    });
}

async function wrapContext(cls, callback) {
    let defer = RSVP.defer();
    let context = await cls.run(async (contextObj) => {
        try {
            let result = await callback(contextObj);
            defer.resolve(result);
        } catch (error) {
            defer.reject(error);
        }
    });
    return defer.promise;
};

function __getMetadata() {
    const metadata = {};
    let contextData = {};
    for (const key of clsNamespace.get('contextKeys') || []) {
        contextData[key] = clsNamespace.get(key);
    }

    for (const key in contextData) {
        metadata[key] = contextData[key];
    }
    return metadata;
}

输出如下:

update 0 metadata ={"device":0}
publish 0 metadata{"device":0}
update 1 metadata ={"device":1}
publish 1 metadata{"device":1}
... (same thing for 1165 devices)
update 1166 metadata ={"device":1166}
update 1167 metadata ={"device":1167}
update 1168 metadata ={"device":1168}
update 1169 metadata ={"device":1169}
... (same thing until 1199)
update 1199 metadata ={"device":1199}
publish 1166 metadata{"device":1199}
publish 1167 metadata{"device":1199}
publish 1168 metadata{"device":1199}
... (same thing until 1199)

如您所见,1165 首次发布日志的元数据是正确的,但是一旦迭代中断并且函数变得异步,则首次发布的元数据将不匹配。

有没有办法来解决这个问题?

4

0 回答 0