0

我在 .net core 2.1 后端使用 Graphql.net 订阅。基本上,订阅与消息服务通信。为了确保将消息传递给呼叫者,将我的消息服务创建为 Singleton。许多 Hangfire 工作需要使用此消息服务。每个作业都从 DI 获得消息服务。我需要的是使用 Redis 进行 PubsUp。这样,我就不需要单例消息服务。

订阅;

public void Activate(ObjectGraphType objectGraph, IHostingEnvironment env, IServiceProvider sp)
        {
            objectGraph.AddField(new EventStreamFieldType
            {
                Name = "messageSubscription",
                Type = typeof(MyMessageGType),
                Resolver = new FuncFieldResolver<MyMessage>(context => context.Source as MyMessage),
                Arguments = new QueryArguments(
                    new QueryArgument<NonNullGraphType<StringGraphType>> { Name = "operationId" },
                    new QueryArgument<NonNullGraphType<CommandWatchModeGType>> { Name = "watchMode" }
                ),
                Subscriber = new EventStreamResolver<MyMessage>(context =>
                {
                    var subscriptionService = (IMyMessageService)sp.GetService(typeof(IMyMessageService));
                    var operationId = context.GetArgument<string>("operationId");
                    var watchMode = context.GetArgument<CommandWatchMode>("watchMode");

                    return subscriptionService.GetMessages(operationId,watchMode);
                })
            });
        }

留言服务,

public class MyMessageService: IMyMessageService
    {    
        private readonly ConcurrentDictionary<string, IObservable<MyMessage>> Subscriptions = 
            new ConcurrentDictionary<string, IObservable<MyMessage>>();

        private readonly ISubject<MyMessage> _messageStream = new ReplaySubject<MyMessage>(0);

        public MyMessage AddBscMessage(MyMessage message)
        {
            _messageStream.OnNext(message);
            return message;
        }

        public IObservable<MyMessage> GetMessages(string operationId, CommandWatchMode watchMode)
        {
            var key = $"{operationId}_{watchMode}";

            if (!Subscriptions.ContainsKey(key))
            {
                var mess = _messageStream
                    .Where(message =>
                        message.OperationId == operationId 
                    ).Select(s => s)
                    .AsObservable();

                Subscriptions.TryAdd(key, mess);
                return mess;
            }
            else
            {
                return Subscriptions[key];
            }
        }
    }

直接投资登记处;

services.AddSingleton<IBscAfadExecuteOtherService, BscAfadExecuteOtherService>();

在这种情况下如何使用 Redis?

4

0 回答 0