0

有什么方法可以使用 EasyNetQ 同步消费来自 RabbitMQ 的原始字节消息?

我需要保证对来自未以 EasyNetQ 格式发布的系统的消息进行有序处理和确认。我知道消费者在单个线程上运行,但该IAdvancedBus接口仅提供一种使用原始消息的方法:

IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

Task返回类型意味着消费者正在异步运行回调,因此可能会乱序处理消息。

如果没有,有什么想法可以更改代码以支持这一点吗?我会制作接口方法:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);

并在 中实现它RabbitAdvancedBus,但我不确定代码的确切位置。

4

2 回答 2

0

这是个有趣的问题。我自己不是 EasyNetQ 专家,也许其他人会过来给你一个更好的答案。然而,我已经熟悉EasyNetQ 代码库大约一年了,在我看来,要了解连接消费者时(以及调用消费者时)发生的事情是很棘手的。

我首先要指出的是,仅仅通过改变方法的签名,并不能保证消息是按顺序处理的。例如,查看您建议的接口的这个实现:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
    Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
    {
        onMessage(bytes, properties, info);
        return new Task(() => { });
    };
    Consume(queue, taskWrapper);
}

它调用原始Consume方法,我们真的不知道之后会发生什么,对吧?

如果我在你的鞋子里,我会做以下事情之一:

  1. 使用官方 RabbitMq 客户端并使用那里的消息表单(这不是那么棘手!)
  2. 也许看看RawRabbit,这是我一直在贡献的 RabbitMq 之上的一个薄层(使用 vNext 标准)。它仅支持使用消息的异步签名,但编写同步实现Subscriber.cs(使用AsyncEx 之类的同步库)应该不难。
  3. 更改业务逻辑的建模。我不确定这是否适用于您的情况,但一般来说,如果以正确的顺序处理每条消息是关键任务,您应该以某种方式对其进行建模,以便使用方法可以验证该消息是下一条排队。(此外,我认为 EasyNetQ 不保证消息序列,因此您可能希望为每个新版本的框架验证它)。

希望这可以帮助!

于 2015-11-18T07:59:21.553 回答
0

我收到了适用于 EasyNetQ Google Group 的回复:

要同步执行,您可以这样做:

bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
    // do your synchronous work.....
    return Task.CompletedTask;
});

或添加扩展名:

using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Loggers;
using EasyNetQ.Topology;

namespace ConsoleApplication4
{
    public static class RabbitAdvancedBusConsumeExtension
    {
       public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
    {
        return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
    }

    public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
    {
        return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
    }

    private static Task ExecuteSynchronously(Action action)
    {
        var tcs = new TaskCompletionSource<object>();
        try
        {
            action();
            tcs.SetResult(null);
        }
        catch (Exception e)
        {
            tcs.SetException(e);
        }
        return tcs.Task;
    }
}

class Program
{
    static void Main(string[] args)
    {
        var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));

        var queue = bus.Advanced.QueueDeclare();
        bus.Advanced.Consume(queue, (bytes, properties, info) =>
        {
            // .....
        });
    }
}
}

更新:此功能已在 0.52.0.410 版中添加:

https://github.com/EasyNetQ/EasyNetQ/pull/505

于 2015-11-18T14:02:20.703 回答