我正在尝试使用 ArtemisNetClient 在 C# 中实现请求响应模式,但是在真正的解决方案中找出如何以更通用的方式执行此操作时遇到了一些麻烦。
基于一些 Java 示例,我能够在两个控制台应用程序中执行类似的操作:
发件人
static async System.Threading.Tasks.Task Main(string[] args)
{
var connectionFactory = new ConnectionFactory();
var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
var connection = await connectionFactory.CreateAsync(endpoint);
string guid = new Guid().ToString();
var requestAddress = "TRADE REQ1";
var responseAddress = "TRADE RESP";
Message message = new Message("BUY AMD 1000 SHARES");
message.SetCorrelationId(guid);
message.ReplyTo = responseAddress;
var producer = await connection.CreateProducerAsync(requestAddress, RoutingType.Anycast);
await producer.SendAsync(message);
var consumer = await connection.CreateConsumerAsync(responseAddress, RoutingType.Anycast);
var responseMessage = await consumer.ReceiveAsync();
Console.WriteLine(responseMessage.GetBody<string>());
}
接收者
static async System.Threading.Tasks.Task Main(string[] args)
{
// Create connection
var connectionFactory = new ConnectionFactory();
var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
var connection = await connectionFactory.CreateAsync(endpoint);
var requestAddress = "TRADE REQ1";
// Create consumer to receive trade request messages
var consumer = await connection.CreateConsumerAsync(requestAddress, RoutingType.Anycast);
var message = await consumer.ReceiveAsync();
Console.WriteLine($"Received message: {message.GetBody<string>()}");
// Confirm trade request and ssend response message
if (!string.IsNullOrEmpty(message.ReplyTo))
{
Message responseMessage = new Message("Confirmed trade request");
responseMessage.SetCorrelationId(message.CorrelationId);
var producer = await connection.CreateProducerAsync(message.ReplyTo);
await producer.SendAsync(responseMessage);
}
}
这按预期工作,但我希望在本文中描述更多内容,除了它没有任何请求响应模式的示例。
详细地说,我目前有两个要通信的服务。
在服务 1中,我想创建并发布一条消息,然后等待响应以丰富实例对象并将其保存到数据库中。我目前有这个,但它缺少等待响应消息。
public async Task<Instance> CreateInstance(Instance instance)
{
await _instanceCollection.InsertOneAsync(instance);
var @event = new InstanceCreated
{
Id = instance.Id,
SiteUrl = instance.SiteUrl
};
await _messageProducer.PublishAsync(@event);
return instance;
}
我想我可能需要在其中设置一个临时队列/连接或其他东西PublishAsync()
并将其更改为例如Task<Message>
支持返回响应消息。但是我该怎么做呢?我是否必须像控制台应用程序示例中那样做一个新的 connectionfactory +CreateConsumerAsync
等?
public class MessageProducer
{
private readonly IAnonymousProducer _producer;
public MessageProducer(IAnonymousProducer producer)
{
_producer = producer;
}
public async Task PublishAsync<T>(T message, string replyTo = null, string correlationId = null)
{
var serialized = JsonSerializer.Serialize(message);
var address = typeof(T).Name;
var msg = new Message(serialized);
if (replyTo != null && correlationId != null)
{
msg.CorrelationId = correlationId;
msg.ReplyTo = replyTo;
}
await _producer.SendAsync(address, msg);
}
public async Task PublishAsync<T>(T message, string routeName, string replyTo = null, string correlationId = null)
{
var serialized = JsonSerializer.Serialize(message);
var address = routeName;
var msg = new Message(serialized);
if(replyTo != null && correlationId != null)
{
msg.CorrelationId = correlationId;
msg.ReplyTo = replyTo;
}
await _producer.SendAsync(address, msg);
}
}
在服务 2中,我有一个InstanceCreatedConsumer
接收消息的,但它再次缺少返回响应消息的方法。
public class InstanceCreatedConsumer : ITypedConsumer<InstanceCreated>
{
private readonly MessageProducer _messageProducer;
public InstanceCreatedConsumer(MessageProducer messageProducer)
{
_messageProducer = messageProducer;
}
public async Task ConsumeAsync(InstanceCreated message, CancellationToken cancellationToken)
{
// consume message and return response
}
}
我想我也许可以ActiveMqExtensions
用 a 扩展类,ConsumeAsync
并HandleMessage
用返回值处理响应消息,但我还没有做到这一点。
public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder,
RoutingType routingType)
where TConsumer : class, ITypedConsumer<TMessage>
{
builder.Services.TryAddScoped<TConsumer>();
builder.AddConsumer(typeof(TMessage).Name, routingType, HandleMessage<TMessage, TConsumer>);
return builder;
}
private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token)
where TConsumer : class, ITypedConsumer<TMessage>
{
try
{
var msg = JsonConvert.DeserializeObject<TMessage>(message.GetBody<string>());
using var scope = serviceProvider.CreateScope();
var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
await typedConsumer.ConsumeAsync(msg, token);
await consumer.AcceptAsync(message);
}
catch(Exception ex)
{
// todo
}
}
我在这里尝试实现的目标是完全错误的,还是使用 ArtemisNetClient 是不可能的?
也许有人有一个例子或者可以确认我是否走在正确的道路上,或者我应该使用不同的框架。
我是通过 ActiveMQ Artemis 等消息进行这种通信的新手,因此不胜感激。