我在 .net core 2.1 后端使用 Graphql.net 订阅。基本上,订阅与消息服务通信。为了确保将消息传递给呼叫者,将我的消息服务创建为 Singleton。许多 Hangfire 工作需要使用此消息服务。每个作业都从 DI 获得消息服务。我需要的是使用 Redis 进行 PubsUp。这样,我就不需要单例消息服务。
订阅;
public void Activate(ObjectGraphType objectGraph, IHostingEnvironment env, IServiceProvider sp)
{
objectGraph.AddField(new EventStreamFieldType
{
Name = "messageSubscription",
Type = typeof(MyMessageGType),
Resolver = new FuncFieldResolver<MyMessage>(context => context.Source as MyMessage),
Arguments = new QueryArguments(
new QueryArgument<NonNullGraphType<StringGraphType>> { Name = "operationId" },
new QueryArgument<NonNullGraphType<CommandWatchModeGType>> { Name = "watchMode" }
),
Subscriber = new EventStreamResolver<MyMessage>(context =>
{
var subscriptionService = (IMyMessageService)sp.GetService(typeof(IMyMessageService));
var operationId = context.GetArgument<string>("operationId");
var watchMode = context.GetArgument<CommandWatchMode>("watchMode");
return subscriptionService.GetMessages(operationId,watchMode);
})
});
}
留言服务,
public class MyMessageService: IMyMessageService
{
private readonly ConcurrentDictionary<string, IObservable<MyMessage>> Subscriptions =
new ConcurrentDictionary<string, IObservable<MyMessage>>();
private readonly ISubject<MyMessage> _messageStream = new ReplaySubject<MyMessage>(0);
public MyMessage AddBscMessage(MyMessage message)
{
_messageStream.OnNext(message);
return message;
}
public IObservable<MyMessage> GetMessages(string operationId, CommandWatchMode watchMode)
{
var key = $"{operationId}_{watchMode}";
if (!Subscriptions.ContainsKey(key))
{
var mess = _messageStream
.Where(message =>
message.OperationId == operationId
).Select(s => s)
.AsObservable();
Subscriptions.TryAdd(key, mess);
return mess;
}
else
{
return Subscriptions[key];
}
}
}
直接投资登记处;
services.AddSingleton<IBscAfadExecuteOtherService, BscAfadExecuteOtherService>();
在这种情况下如何使用 Redis?