我一直在通过以下示例设置具有订阅支持的 GraphQL .NET 项目:https ://github.com/graphql-dotnet/server/tree/develop/samples/Samples.Schemas.Chat
我似乎已经完全按照 ChatSchema 中的示例中概述的方式设置了所有内容,但我仍然无法使其正常工作。我从服务器得到的错误是:
Cannot subscribe as no result stream available
如果我向 /graphql 端点发送订阅查询,这是我在 UI Playground 中遇到的错误:
{
"error": {
"name": "FormatedError",
"message": "Unknown error",
"originalError": {
"data": null,
"extensions": {
"tracing": {
"version": 1,
"startTime": "2021-06-18T13:10:21.6759947Z",
"endTime": "2021-06-18T13:10:21.6769947Z",
"duration": 546000,
"parsing": {
"startOffset": 3200,
"duration": 87300
},
"validation": {
"startOffset": 92500,
"duration": 386800
},
"execution": {
"resolvers": []
}
}
}
}
}
}
这些是我在本地运行的 GraphQL .NET 项目中看到的日志:
[23:10:21 DBG] Handle start: 1
[23:10:21 DBG] Executing operation: order_created query: subscription order_created($eventId: ID!) {
order_subscription {
order_created(eventId: $eventId) {
eventId
eventTimestampUtc
}
}
}
[23:10:21 ERR] Cannot subscribe as no result stream available
[23:10:21 DBG] Handling message: 1 of type: stop
[23:10:21 DBG] Received message: Type: stop Id: 1 Payload:
[23:10:21 DBG] Handle stop: 1
[23:10:21 DBG] Subscription: 1 unsubscribed
我的Startup.cs
样子是这样的:
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<OrderingPlatformSchema>();
services.AddSingleton<IOrderEventStreamService, OrderEventStreamService>();
services.AddSingleton<IDocumentExecuter, SubscriptionDocumentExecuter>();
services.AddGraphQL((options, provider) =>
{
var logger = provider.GetRequiredService<ILogger<Startup>>();
options.UnhandledExceptionDelegate = ctx => logger.LogError("{Error} occurred", ctx.OriginalException.Message);
})
.AddErrorInfoProvider((options) =>
{
options.ExposeExtensions = false;
options.ExposeExceptionStackTrace = true;
})
.AddDefaultEndpointSelectorPolicy()
.AddSystemTextJson(deserializerSettings => { }, serializerSettings => { })
.AddWebSockets()
.AddDataLoader()
.AddGraphTypes(typeof(OrderingPlatformSchema));
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
app.UseDeveloperExceptionPage();
app.UseWebSockets();
app.UseGraphQLWebSockets<OrderingPlatformSchema>();
app.UseGraphQL<OrderingPlatformSchema, GraphQLCustomLoggingMiddleware<OrderingPlatformSchema>>();
if (env.IsDevelopment())
{
app.UseGraphQLPlayground(new PlaygroundOptions
{
BetaUpdates = true,
RequestCredentials = RequestCredentials.Omit,
HideTracingResponse = false,
EditorCursorShape = EditorCursorShape.Line,
EditorTheme = EditorTheme.Light,
EditorFontSize = 14,
EditorReuseHeaders = true,
EditorFontFamily = "Consolas",
PrettierPrintWidth = 80,
PrettierTabWidth = 2,
PrettierUseTabs = true,
SchemaDisableComments = false,
SchemaPollingEnabled = true,
SchemaPollingEndpointFilter = "*localhost*",
SchemaPollingInterval = 5000
});
}
}
我的Schema
对象如下所示:
public class OrderingPlatformSchema : Schema
{
public OrderingPlatformSchema(IServiceProvider provider) : base(provider)
{
Query = new OrderingPlatformQuery();
Subscription = new OrderingPlatformSubscription();
}
}
OrderingPlatformSubscription
对象:
public class OrderingPlatformSubscription : ObjectGraphType<object>
{
public OrderingPlatformSubscription()
{
Name = nameof(OrderingPlatformSubscription);
Field<OrderSubscription>("order_subscription", resolve: context => new {});
}
}
OrderSubscription
对象:
internal class OrderSubscription : ObjectGraphType<object>
{
private readonly IOrderEventStreamService orderEventStreamService;
public OrderSubscription(IOrderEventStreamService orderEventStreamService)
{
this.orderEventStreamService = orderEventStreamService;
Name = nameof(OrderSubscription);
AddField(new EventStreamFieldType
{
Name = "order_created",
Arguments = new QueryArguments(
new QueryArgument<NonNullGraphType<IdGraphType>> { Name = "eventId" }
),
Type = typeof(OrderCreatedGraphType),
Resolver = new FuncFieldResolver<OrderCreatedEvent>(context => context.Source as OrderCreatedEvent),
Subscriber = new EventStreamResolver<OrderCreatedEvent>(SubscribeByEventId)
});
}
private IObservable<OrderCreatedEvent> SubscribeByEventId(IResolveEventStreamContext context)
{
var eventId = context.GetArgument<Guid>("eventId");
var events = orderEventStreamService.OrderCreatedEvents();
return events.Where(x => x.EventId == eventId);
}
}
最后,OrderEventStreamService
看起来像这样:
internal class OrderEventStreamService : IOrderEventStreamService
{
private readonly ISubject<OrderCreatedEvent> orderCreatedEventStream = new ReplaySubject<OrderCreatedEvent>(1);
public IObservable<OrderCreatedEvent> OrderCreatedEvents()
{
return orderCreatedEventStream.AsObservable();
}
}
非常感谢任何帮助/建议。