1

首先,请原谅我的英语,它非常糟糕。我将 MassTransit 与 Azure 服务总线一起用于微服务之间的异步通信。我需要使用中间件过滤器将元数据添加到消息中。问题是 Send() 和 Publish() 的过滤器为消息继承自的每个类以及它实现的每个接口都实例化一次。要注册过滤器,我使用扩展方法“UseSendFilter()/UsePublishFilter()”。

有没有办法只为消息的具体类型调用发送/发布过滤器?

我留下一个例子,希望能帮助你理解这个问题。

发送过滤器

public class SendFilterTest<T> : IFilter<SendContext<T>> where T : class
{
    private readonly ILogger<SendFilterTest<T>> _logger;

    public SendFilterTest(ILogger<SendFilterTest<T>> logger = null)
    {
        _logger = logger ?? NullLogger<SendFilterTest<T>>.Instance;
    }
    public async Task Send(SendContext<T> context, IPipe<SendContext<T>> next)
    {
        try
        {
            _logger.LogInformation($"SendFilterInstance: {this.GetHashCode()}. MessageId: {context.MessageId}. NextType: {next.GetType().GetFriendlyName(shortName:true)} "); 
        }
        finally
        {
            // here the next filter in the pipe is called
            await next.Send(context);
        }
    }

    public void Probe(ProbeContext context)
    {
        context.CreateScope(this.GetType().Name);
    }
}

过滤器注册通用方法

        //in this case sendFilterTypes = new List<Type>() { typeof(SendFilterTest<>) }

     ...
        //send filters middleware register
        if (sendFilterTypes != null)
        {
            foreach (var sendFilterType in sendFilterTypes.Where(t => t != null))
            {
                cfg.UseSendFilter(sendFilterType, context);
            }
        }

        //publish filters middleware register
        if (publishFilterTypes != null)
        {
            foreach (var publishFilterType in publishFilterTypes.Where(t => t != null))
            {
                cfg.UsePublishFilter(publishFilterType, context);
            }
        }

        cfg.ReceiveEndpoint(shortMyQueueName, e =>
        {
            e.ConfigureConsumers(context);
        });
    ...

发送的消息及其接口

public sealed class TestIntegrationCommand : IOutcomingCommand
{
    public Uri GetDestinationQueueUri()
    {
        return new($"queue:test-queue");
    }
}

[ExcludeFromTopology]
public interface IOutcomingCommand : ICommand
{
    [Obsolete("Use property 'Id' instead")]
    Guid CommandId => Id;
    Uri GetDestinationQueueUri();
}

[ExcludeFromTopology]
public interface ICommand
{
    Guid Id => Guid.NewGuid();
}

发送方法

...
   var testCommand = new TestIntegrationCommand();
   await (await bus.GetSendEndpoint(testCommand.GetDestinationQueueUri())).Send(testCommand);
...

输出

[2021-04-07 20:15:49Z] info: UnitTests.DependencyInjection.CqrsServicesTest[0]
  StartTest
[2021-04-07 20:15:49Z] info: MassTransit[0]
      Configured endpoint test-queue, Consumer: UnitTests.TestDomain.TestAggregateConsumer

[2021-04-07 20:15:49Z] info: MassTransit[0]
      Configured endpoint test-queue, Consumer: UnitTests.TestDomain.TestEvent1Consumer
[2021-04-07 20:15:50Z] info: MassTransit[0]
      Bus started: sb://tms-testing.servicebus.windows.net/
[2021-04-07 20:15:50Z] info: UnitTests.Filters.SendFilterTest[0]
      SendFilterInstance: 17664976. MessageId: fc9e0000-01e6-64c9-bf0f-08d8fa01f04d. NextType: MergePipe<SendContext<UnitTests.TestDomain.TestIntegrationCommand>, SendContext<Tms.Framework.Cqrs.Core.Commands.ICommand>> 
[2021-04-07 20:15:50Z] info: UnitTests.Filters.SendFilterTest[0]
      SendFilterInstance: 39332178. MessageId: fc9e0000-01e6-64c9-bf0f-08d8fa01f04d. NextType: MergePipe<SendContext<Tms.Framework.Cqrs.Core.Commands.Outcoming.IOutcomingCommand>, SendContext<Tms.Framework.Cqrs.Core.Commands.ICommand>> 
[2021-04-07 20:15:50Z] info: UnitTests.Filters.SendFilterTest[0]
      SendFilterInstance: 26669753. MessageId: fc9e0000-01e6-64c9-bf0f-08d8fa01f04d. NextType: MergePipe<SendContext<UnitTests.TestDomain.TestIntegrationCommand>, SendContext<Tms.Framework.Cqrs.Core.Commands.Outcoming.IOutcomingCommand>> 
[2021-04-07 20:15:50Z] info: UnitTests.Filters.SendFilterTest[0]
      SendFilterInstance: 15215027. MessageId: fc9e0000-01e6-64c9-bf0f-08d8fa01f04d. NextType: Last<SendContext<UnitTests.TestDomain.TestIntegrationCommand>> 
[2021-04-07 20:15:51Z] info: UnitTests.TestDomain.TestAggregateConsumer[0]
      Consuming 'TestIntegrationCommand'. Id:UnitTests.TestDomain.TestIntegrationCommand. Count: 0. DO NOTHING!
[2021-04-07 20:15:51Z] info: MassTransit[0]
      Bus stopped: sb://tms-testing.servicebus.windows.net/
[2021-04-07 20:15:52Z] info: UnitTests.DependencyInjection.CqrsServicesTest[0]
      FinishTest

非常感谢。

问候

博尔哈

4

0 回答 0