4

我正在开发 asp.net core 2.0 webapi 并想要一个后台任务来处理来自 kafka 消息总线的消息。我阅读了一些关于 IHostedService 的文档并创建了一个自定义后台服务。我正在使用 MediatR 实施 CQRS。

我已经在 Autofac 中注册了 MediatR 模块。我需要 Meditatr 对象在自定义托管服务中可用。谁能帮我实现这一目标?

autofac.json

{
  "modules": [
    {
      "type": "Producer.Infrastructure.Modules.MediatRModule",
      "properties": {
      }
    }
  ]
}

Autofac模块:

namespace Producer.Infrastructure.Modules
{
    using Autofac;
    using Autofac.Features.Variance;
    using Producer.Application.Commands.Blogs;
    using MediatR;
    using System.Collections.Generic;
    using System.Reflection;

    public class MediatRModule : Autofac.Module
    {
        protected override void Load(ContainerBuilder builder)
        {
            builder.RegisterSource(new ContravariantRegistrationSource());

            builder
                .RegisterType<Mediator>()
                .As<IMediator>()
                .InstancePerLifetimeScope();

            builder
                .Register<SingleInstanceFactory>(ctx => {
                    var c = ctx.Resolve<IComponentContext>();
                    return t => { object o; return c.TryResolve(t, out o) ? o : null; };
                })
                .InstancePerLifetimeScope();

            builder
                .Register<MultiInstanceFactory>(ctx => {
                    var c = ctx.Resolve<IComponentContext>();
                    return t => (IEnumerable<object>)c.Resolve(typeof(IEnumerable<>).MakeGenericType(t));
                })
                .InstancePerLifetimeScope();

            builder.RegisterAssemblyTypes(typeof(CreateBlogCommand).GetTypeInfo().Assembly).AsImplementedInterfaces(); // via assembly scan
        }
    }
}

程序.cs

公共类程序 { public static void Main(string[] args) { BuildWebHost(args).Run(); }

public static IWebHost BuildWebHost(string[] args)
{
    return WebHost.CreateDefaultBuilder(args)
            .UseStartup<Startup>()
            .ConfigureAppConfiguration((builderContext, config) =>
            {
                IHostingEnvironment env = builderContext.HostingEnvironment;
                config.AddJsonFile("autofac.json");
            })
            .ConfigureServices(services => services.AddAutofac())
            .Build();
}

}

启动.cs

    IServiceProvider serviceProvider;
    public IServiceProvider ConfigureServices(IServiceCollection services)
    {
        var brokerList = Configuration.GetSection("Kafka").GetValue<string>("BrokerList");
        var topic = Configuration.GetSection("Kafka").GetValue<string>("Topic");

        //Add framework services
        services.AddMvc().AddJsonOptions(options =>
        {
            options.SerializerSettings.ContractResolver = new CamelCasePropertyNamesContractResolver();
        });

        services.AddSingleton<IHostedService>(s => new BackgroundService(brokerList, topic));

        // Create an Autofac Container and push the framework services
        var containerBuilder = new ContainerBuilder();
        containerBuilder.Populate(services);

        //Register your own services within Autofac
        containerBuilder.RegisterModule(new ConfigurationModule(Configuration));

        var container = containerBuilder.Build();
        serviceProvider = container.Resolve<IServiceProvider>();

        return serviceProvider;
    }

后台服务

    public class BackgroundService : HostedService
    {
        public readonly string brokerList;
        public readonly string topic;


        public BackgroundService(string brokerList, string topic)
        {
            this.brokerList = brokerList;
            this.topic = topic;
        }

        protected override async Task ExecuteAsync(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
//I need to access the Mediatr here???
            }
        }
    }

谢谢

4

0 回答 0