我有一个 Feathers 应用程序,它使用 RabbitMQ 和一个自定义 amqplib 包装器来与在其他地方运行的其他代码进行通信,我正在努力编写一个好的集成测试,以显示收到消息时运行的回调运行正确。实际的回调只是获取接收到的消息的主体并调用内部服务将数据放入数据库中。
我有一个在测试环境中运行的 RabbitMQ 服务器,想法是编写一个测试,将一些虚拟数据发布到正确的交换,然后检查数据是否最终进入数据库。问题是在我检查数据库之前我不知道如何判断回调已经完成。
现在,我只是发布消息,然后在检查数据库之前使用超时等待几秒钟,但我不喜欢这样,因为不能保证回调会完成。
我正在测试的代码看起来像这样(不是实际代码只是一个示例):
const app = require('./app');
// handleAMQP is passed as a callback to the consumer
// it creates a new record in the myService database
const handleAMQP = async(message) => {
await app.service('users').create(message.content);
};
// Subscribe takes an amqp connection, opens a channel, and connects a callback
const subscribe = (conn) => {
let queue = 'myQueue';
let exchange = 'myExchange';
return conn.createChannel().then(function (ch) {
var ok = ch.assertExchange(exchange, 'topic', { durable: true });
ok = ok.then(function () {
return ch.assertQueue(queue, { exclusive: true });
});
ok = ok.then(function (qok) {
var queue = qok.queue;
ch.bindQueue(queue, exchange, topic);
});
ok = ok.then(function (queue) {
return ch.consume(queue, handleAMQP);
});
});
};
module.exports = {subscribe};
我的测试看起来像这样:
const assert = require('assert');
const amqp = require('amqplib');
describe('AMQP Pub/Sub Tests', async () => {
let exchange = 'myExchange';
let topic = 'myTopic';
let dummyData = {
email: 'example@example.com',
name: 'Example User'
}
it('creates a new db enry when amqp message recieved', async () => {
// Publish some dummy data
await amqp.connect('amqp://localhost').then((conn) => {
conn.createChannel().then((ch) => {
ch.assertExchange(exchange, 'topic', {durable: true}).then(() => {
ch.publish(exchange, topic, dummyData).then(() => {
ch.close();
})
});
});
});
await setTimeout(() => { // Wait three seconds
let result = app.service('users').find({email : 'example@example.com'}); // Attempt to find the newly created user
assert.deepEqual(result.email, dummyData.email);
assert.deepEqual(result.name, dummyData.name);
}, 3000);
});
});
在检查记录是否存在之前,不只是等待任意时间限制,有没有更好的方法来构建这个测试?
还是等待某个时间对于事件驱动功能完全有效?