0

我一直在通过以下示例设置具有订阅支持的 GraphQL .NET 项目:https ://github.com/graphql-dotnet/server/tree/develop/samples/Samples.Schemas.Chat

我似乎已经完全按照 ChatSchema 中的示例中概述的方式设置了所有内容,但我仍然无法使其正常工作。我从服务器得到的错误是:

Cannot subscribe as no result stream available

如果我向 /graphql 端点发送订阅查询,这是我在 UI Playground 中遇到的错误:

{
  "error": {
    "name": "FormatedError",
    "message": "Unknown error",
    "originalError": {
      "data": null,
      "extensions": {
        "tracing": {
          "version": 1,
          "startTime": "2021-06-18T13:10:21.6759947Z",
          "endTime": "2021-06-18T13:10:21.6769947Z",
          "duration": 546000,
          "parsing": {
            "startOffset": 3200,
            "duration": 87300
          },
          "validation": {
            "startOffset": 92500,
            "duration": 386800
          },
          "execution": {
            "resolvers": []
          }
        }
      }
    }
  }
}

这些是我在本地运行的 GraphQL .NET 项目中看到的日志:

[23:10:21 DBG] Handle start: 1
[23:10:21 DBG] Executing operation: order_created query: subscription order_created($eventId: ID!) {
  order_subscription {
    order_created(eventId: $eventId) {
      eventId
      eventTimestampUtc
    }
  }
}
[23:10:21 ERR] Cannot subscribe as no result stream available
[23:10:21 DBG] Handling message: 1 of type: stop
[23:10:21 DBG] Received message: Type: stop Id: 1 Payload: 
[23:10:21 DBG] Handle stop: 1
[23:10:21 DBG] Subscription: 1 unsubscribed

我的Startup.cs样子是这样的:

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddSingleton<OrderingPlatformSchema>();

            services.AddSingleton<IOrderEventStreamService, OrderEventStreamService>();

            services.AddSingleton<IDocumentExecuter, SubscriptionDocumentExecuter>();

            services.AddGraphQL((options, provider) =>
            {
                var logger = provider.GetRequiredService<ILogger<Startup>>();
                
                options.UnhandledExceptionDelegate = ctx => logger.LogError("{Error} occurred", ctx.OriginalException.Message);
            })
            .AddErrorInfoProvider((options) =>
            {
                options.ExposeExtensions = false;
                options.ExposeExceptionStackTrace = true;
            })
            .AddDefaultEndpointSelectorPolicy()
            .AddSystemTextJson(deserializerSettings => { }, serializerSettings => { })
            .AddWebSockets()
            .AddDataLoader()
            .AddGraphTypes(typeof(OrderingPlatformSchema));
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
                app.UseDeveloperExceptionPage();

            app.UseWebSockets();

            app.UseGraphQLWebSockets<OrderingPlatformSchema>();
            app.UseGraphQL<OrderingPlatformSchema, GraphQLCustomLoggingMiddleware<OrderingPlatformSchema>>();

            if (env.IsDevelopment())
            {
                app.UseGraphQLPlayground(new PlaygroundOptions
                {
                    BetaUpdates = true,
                    RequestCredentials = RequestCredentials.Omit,
                    HideTracingResponse = false,

                    EditorCursorShape = EditorCursorShape.Line,
                    EditorTheme = EditorTheme.Light,
                    EditorFontSize = 14,
                    EditorReuseHeaders = true,
                    EditorFontFamily = "Consolas",

                    PrettierPrintWidth = 80,
                    PrettierTabWidth = 2,
                    PrettierUseTabs = true,

                    SchemaDisableComments = false,
                    SchemaPollingEnabled = true,
                    SchemaPollingEndpointFilter = "*localhost*",
                    SchemaPollingInterval = 5000
                });
            }
        }

我的Schema对象如下所示:

    public class OrderingPlatformSchema : Schema
    {
        public OrderingPlatformSchema(IServiceProvider provider) : base(provider)
        {
            Query = new OrderingPlatformQuery();
            Subscription = new OrderingPlatformSubscription();
        }
    }

OrderingPlatformSubscription对象:

    public class OrderingPlatformSubscription : ObjectGraphType<object>
    {
        public OrderingPlatformSubscription()
        {
            Name = nameof(OrderingPlatformSubscription);

            Field<OrderSubscription>("order_subscription", resolve: context => new {});
        }
    }

OrderSubscription对象:

    internal class OrderSubscription : ObjectGraphType<object>
    {
        private readonly IOrderEventStreamService orderEventStreamService;

        public OrderSubscription(IOrderEventStreamService orderEventStreamService)
        {
            this.orderEventStreamService = orderEventStreamService;

            Name = nameof(OrderSubscription);

            AddField(new EventStreamFieldType
            {
                Name = "order_created",
                Arguments = new QueryArguments(
                    new QueryArgument<NonNullGraphType<IdGraphType>> { Name = "eventId" }
                ),
                Type = typeof(OrderCreatedGraphType),
                Resolver = new FuncFieldResolver<OrderCreatedEvent>(context => context.Source as OrderCreatedEvent),
                Subscriber = new EventStreamResolver<OrderCreatedEvent>(SubscribeByEventId)
            });
        }

        private IObservable<OrderCreatedEvent> SubscribeByEventId(IResolveEventStreamContext context)
        {
            var eventId = context.GetArgument<Guid>("eventId");

            var events = orderEventStreamService.OrderCreatedEvents();

            return events.Where(x => x.EventId == eventId);
        }
    }

最后,OrderEventStreamService看起来像这样:

    internal class OrderEventStreamService : IOrderEventStreamService
    {
        private readonly ISubject<OrderCreatedEvent> orderCreatedEventStream = new ReplaySubject<OrderCreatedEvent>(1);

        public IObservable<OrderCreatedEvent> OrderCreatedEvents()
        {
            return orderCreatedEventStream.AsObservable();
        }
    }

非常感谢任何帮助/建议。

4

0 回答 0