我在 asp.net 核心中创建了一个 RestApi,并且在我的一项服务中,我使用 RawRabbit 向 RabbitMQ 发布了一条消息。话虽如此,当我评论订阅者部分时,我看到消息已在 RabbitMQ 控制面板中发布,消费者数量为 0,当我添加订阅者部分时,消费者的数量变为 1,并且消息正在被消费,所以那里控制面板中没有消息,但奇怪的是订阅者中的所有代码(在这种情况下它只是一个日志)都没有运行。
发布者部分:
public async void RaiseAsync(string event_name, ShoppingCartItemAdded data) {
const string EXCHANGE_NAME = "myRabbit";
Action<IPublishContext> x = (ctx) => ctx.UsePublishConfiguration(xfg => xfg.OnExchange(EXCHANGE_NAME));//.WithRoutingKey("shoppingcartitemadded"));
await this.Client.PublishAsync<ShoppingCartItemAdded>(data, x );
}
订阅者部分:
public class ShoppingCartItemAddedConsumer
{
private readonly ILogger<ShoppingCartItemAddedConsumer> logger;
private readonly IBusClient client;
public ShoppingCartItemAddedConsumer(ILogger<ShoppingCartItemAddedConsumer> logger, IBusClient client)
{
this.logger = logger;
this.client = client;
this.logger.LogInformation("Subscriber created");
}
public async void Run()
{
const string QUEUE_NAME = "myWebApi";
const string EXCHANGE_NAME = "myRabbit";
this.logger.LogInformation("Registering subscriber");
await client.SubscribeAsync<ShoppingCartItemAdded>(async msg => {
this.logger.LogInformation("Message received from rabbitmq : {message}", msg);
}, ctx => ctx.UseSubscribeConfiguration(cfg =>
cfg.OnDeclaredExchange(dex => dex.WithName(EXCHANGE_NAME)
.WithAutoDelete(false)
.WithDurability(true)
.WithType(ExchangeType.Topic)
)
.FromDeclaredQueue(dq => dq.WithName(QUEUE_NAME)
.WithExclusivity(false)
.WithDurability(true)
.WithAutoDelete(false))));
this.logger.LogInformation("Subscriber registered");
}
}
我将订阅者注册为单例服务,我Run()在 Startup.cs 中调用它的方法
var consumer = app.ApplicationServices.GetRequiredService<ShoppingCartItemAddedConsumer>();
consumer.Run();
这是服务启动时的日志:
[09:02:25 INF] Subscriber created
[09:02:26 INF] Registering subscriber
[09:02:26 INF] Configuration action for shoppingcartitemadded found.
[09:02:26 INF] Declaring queue myWebApi.
[09:02:26 INF] Declaring exchange myRabbit.
[09:02:26 INF] Binding queue myWebApi to exchange myRabbit with routing key shoppingcartitemadded
[09:02:26 INF] Preparing to consume message from queue 'myWebApi'.
[09:02:26 INF] Subscriber registered
以下是发布消息时的日志:
[14:13:35 INF] Setting 'Publish Acknowledge' for channel '3'
[14:13:35 INF] Setting up publish acknowledgement for 1 with timeout 0:00:01
[14:13:35 INF] Sequence 1 added to dictionary
[14:13:35 WRN] No body found in the Pipe context.
[14:13:35 INF] Performing basic publish with routing key shoppingcartitemadded on exchange myRabbitAPI.
[14:13:35 INF] Setting up publish acknowledgement for 2 with timeout 0:00:01
[14:13:35 INF] Sequence 2 added to dictionary
[14:13:35 WRN] No body found in the Pipe context.
[14:13:35 INF] Performing basic publish with routing key shoppingcartitemadded on exchange myRabbitAPI.
[14:13:35 INF] Executed action method HelloMicroServices.Controllers.ShoppingCartController.Post (HelloMicroServices), returned result Microsoft.AspNetCore.Mvc.OkObjectResult in 304.6292ms.
[14:13:35 INF] Executing ObjectResult, writing value of type 'HelloMicroServices.Datastores.Models.ShoppingCart'.
[14:13:35 INF] Executed action HelloMicroServices.Controllers.ShoppingCartController.Post (HelloMicroServices) in 414.4309ms
[14:13:35 INF] Request finished in 475.1868ms 200 application/json; charset=utf-8
[14:13:35 INF] Recieived ack for 1
[14:13:35 INF] Recieived ack for 2