2

我正在尝试使用 ArtemisNetClient 在 C# 中实现请求响应模式,但是在真正的解决方案中找出如何以更通用的方式执行此操作时遇到了一些麻烦。

基于一些 Java 示例,我能够在两个控制台应用程序中执行类似的操作:

发件人

static async System.Threading.Tasks.Task Main(string[] args)
{
    var connectionFactory = new ConnectionFactory();
    var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
    var connection = await connectionFactory.CreateAsync(endpoint);

    string guid = new Guid().ToString();

    var requestAddress = "TRADE REQ1";
    var responseAddress = "TRADE RESP";

    Message message = new Message("BUY AMD 1000 SHARES");
    message.SetCorrelationId(guid);
    message.ReplyTo = responseAddress;

    var producer = await connection.CreateProducerAsync(requestAddress, RoutingType.Anycast);
    await producer.SendAsync(message);

    var consumer = await connection.CreateConsumerAsync(responseAddress, RoutingType.Anycast);
    var responseMessage = await consumer.ReceiveAsync();

    Console.WriteLine(responseMessage.GetBody<string>());
    
}

接收者

static async System.Threading.Tasks.Task Main(string[] args)
{
    // Create connection
    var connectionFactory = new ConnectionFactory();
    var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
    var connection = await connectionFactory.CreateAsync(endpoint);

    var requestAddress = "TRADE REQ1";

    // Create consumer to receive trade request messages
    var consumer = await connection.CreateConsumerAsync(requestAddress, RoutingType.Anycast);
    var message = await consumer.ReceiveAsync();

    Console.WriteLine($"Received message: {message.GetBody<string>()}");

    // Confirm trade request and ssend response message
    if (!string.IsNullOrEmpty(message.ReplyTo))
    {
        Message responseMessage = new Message("Confirmed trade request");
        responseMessage.SetCorrelationId(message.CorrelationId);
        var producer = await connection.CreateProducerAsync(message.ReplyTo);
        await producer.SendAsync(responseMessage);
    }
}

这按预期工作,但我希望在本文中描述更多内容,除了它没有任何请求响应模式的示例。

详细地说,我目前有两个要通信的服务。

服务 1中,我想创建并发布一条消息,然后等待响应以丰富实例对象并将其保存到数据库中。我目前有这个,但它缺少等待响应消息。

public async Task<Instance> CreateInstance(Instance instance)
{
    await _instanceCollection.InsertOneAsync(instance);

    var @event = new InstanceCreated
    {
        Id = instance.Id,
        SiteUrl = instance.SiteUrl
    };

    await _messageProducer.PublishAsync(@event);

    return instance;
}

我想我可能需要在其中设置一个临时队列/连接或其他东西PublishAsync()并将其更改为例如Task<Message>支持返回响应消息。但是我该怎么做呢?我是否必须像控制台应用程序示例中那样做一个新的 connectionfactory +CreateConsumerAsync等?

public class MessageProducer
{
    private readonly IAnonymousProducer _producer;

    public MessageProducer(IAnonymousProducer producer)
    {
        _producer = producer;
    }

    public async Task PublishAsync<T>(T message, string replyTo = null, string correlationId = null)
    {
        var serialized = JsonSerializer.Serialize(message);
        var address = typeof(T).Name;
        var msg = new Message(serialized);
        if (replyTo != null && correlationId != null)
        {
            msg.CorrelationId = correlationId;
            msg.ReplyTo = replyTo;
        }
        await _producer.SendAsync(address, msg);
    }

    public async Task PublishAsync<T>(T message, string routeName, string replyTo = null, string correlationId = null)
    {
        var serialized = JsonSerializer.Serialize(message);
        var address = routeName;
        var msg = new Message(serialized);
        if(replyTo != null && correlationId != null)
        {
            msg.CorrelationId = correlationId;
            msg.ReplyTo = replyTo;
        }
        await _producer.SendAsync(address, msg);
    }
}

服务 2中,我有一个InstanceCreatedConsumer接收消息的,但它再次缺少返回响应消息的方法。

public class InstanceCreatedConsumer : ITypedConsumer<InstanceCreated>
{
    private readonly MessageProducer _messageProducer;
    public InstanceCreatedConsumer(MessageProducer messageProducer)
    {
        _messageProducer = messageProducer;
    }
    public async Task ConsumeAsync(InstanceCreated message, CancellationToken cancellationToken)
    {
        // consume message and return response
    }
}

我想我也许可以ActiveMqExtensions用 a 扩展类,ConsumeAsyncHandleMessage用返回值处理响应消息,但我还没有做到这一点。

public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder,
    RoutingType routingType)
    where TConsumer : class, ITypedConsumer<TMessage>
{
    builder.Services.TryAddScoped<TConsumer>();
    builder.AddConsumer(typeof(TMessage).Name, routingType, HandleMessage<TMessage, TConsumer>);
    return builder;
}

private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token)
    where TConsumer : class, ITypedConsumer<TMessage>
{
    try
    {
        var msg = JsonConvert.DeserializeObject<TMessage>(message.GetBody<string>());
        using var scope = serviceProvider.CreateScope();
        var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
        await typedConsumer.ConsumeAsync(msg, token);
        await consumer.AcceptAsync(message);
    }
    catch(Exception ex)
    {
        // todo
    }
}

我在这里尝试实现的目标是完全错误的,还是使用 ArtemisNetClient 是不可能的?

也许有人有一个例子或者可以确认我是否走在正确的道路上,或者我应该使用不同的框架。

我是通过 ActiveMQ Artemis 等消息进行这种通信的新手,因此不胜感激。

4

1 回答 1

2

从您的应用程序的角度来看,我在 ArtemisNetClient 中看不到任何可以简化请求/响应模式的内容。人们可能会期待类似于 JMS' 的东西QueueRequestor,但我在代码中看不到任何类似的东西,我也没有在文档中看到类似的东西。

我建议您简单地在您的应用程序中执行您在示例中所做的操作(即手动创建消费者和生产者以分别处理每一端的响应)。我建议的唯一更改是重用连接,以便您创建尽可能少的连接。连接池在这里是理想的。


就其价值而言,在我看来,ArtemisNetClient 的第一个版本仅在 3 个月前发布,根据 GitHub,除了 2 个对代码库的提交之外,其他所有提交都来自一个开发人员。ArtemisNetClient 可能会成长为一个非常成功的 C# 客户端实现,但在这一点上它似乎相对不成熟。即使现有代码是高质量的,如果客户端周围没有一个稳固的社区,那么它也很可能没有必要的支持来获得及时的错误修复、新功能等。只有时间会证明一切

于 2021-05-21T15:26:39.423 回答