0

我的 ocelot 配置如下所示(仅相关部分):

"Routes": [
    {
      "DownstreamPathTemplate": "/api/service1/json/{pageSize}/{pageNo}/all/{partnerId}",
      "DownstreamScheme": "http",
      "UpstreamPathTemplate": "/a",
      "UpstreamHttpMethod": [ "Get" ],
      "ServiceName": "Service1",
      "LoadBalancerOptions": {
        "Type": "LeastConnection"
      },
      "Key": "v0-service1",
      "Priority": 1
    },
    {
      "DownstreamPathTemplate": "/api/service2/json/{pageSize}/{pageNo}/{partnerId}",
      "DownstreamScheme": "http",
      "UpstreamPathTemplate": "/b",
      "UpstreamHttpMethod": [ "Get" ],
      "ServiceName": "Service2",
      "LoadBalancerOptions": {
        "Type": "LeastConnection"
      },
      "Key": "v0-service2",
      "Priority": 1
    }
    ],
    "Aggregates": [
    {
      "RouteKeys": [
        "v0-service1",
        "v0-service2"
      ],
      "UpstreamPathTemplate": "/api/services/{partnerId}?pageNo={pageNo}&pageSize={pageSize}",
      "Aggregator": "GetDataAggregator"
    }
]

聚合器暂时定义如下:

public class GetOffersAggregator : IDefinedAggregator
{
    public async Task<DownstreamResponse> Aggregate(List<HttpContext> responses)
    {
        var readers = responses.Select(r => new StreamReader(r.Response.Body)).ToList();
        var objects = await Task.WhenAll(readers.Select(r => r.ReadToEndAsync()));

        readers.ForEach(r => r.Dispose());

        throw new NotImplementedException();
    }
}

现在,如果我在两个下游服务中都设置了断点,那么当我调用"/api/services/{partnerId}?pageNo={pageNo}&pageSize={pageSize}". 问题是两个响应都是 404(即使 api 网关控制台记录 200 OK 响应也是正确的,因为两个断点都被命中并且服务正在返回数据)。如果我检查HttpContexts 传递给我的聚合器,我可以看到请求对象包含完全错误的地址。看起来好像 Ocelot 正确地调用了我的端点,但同时调用了一些不存在的端点并将它们的结果传递给我的聚合器。有任何想法吗?

编辑:

我可以在 HttpContext 的 Items 属性中看到我的服务响应。该死,这看起来真的像是图书馆作者方面的一些严重错误,还是设计使然?

4

1 回答 1

1

我遇到了同样的情况,你只需要更新你引用数据的方式。数据实际上存在于responses[index].Items.DownstreamResponse().Content 中。

这是我所做的一个似乎有效的小例子。由于下游内容是压缩的,因此我有两个辅助函数可以解压缩并将内容转换为 Json 对象。

internal class DemoAggregator : IDefinedAggregator
    {
        public async Task<DownstreamResponse> Aggregate(List<HttpContext> responses)
        {
            List<Header> header = new List<Header>();
            try
            {
                var headers = responses.SelectMany(x => x.Items.DownstreamResponse().Headers).ToList();

                var oneByteArray = await responses[0].Items.DownstreamResponse().Content.ReadAsByteArrayAsync();
                var oneData = Decompress(oneByteArray);
                var oneObj = ConvertToJson(oneData);
                var oneContent = new StringContent(JsonConvert.SerializeObject(oneObj), Encoding.UTF8, "application/json");

                return new DownstreamResponse(oneContent, HttpStatusCode.OK, headers, "OK");
            }
            catch (Exception ex)
            {
                return new DownstreamResponse(null, System.Net.HttpStatusCode.InternalServerError, header, null);
            }
        }

        private static byte[] Decompress(byte[] data)
        {
            using (var compressedStream = new MemoryStream(data))
            using (var zipStream = new GZipStream(compressedStream, CompressionMode.Decompress))
            using (var resultStream = new MemoryStream())
            {
                zipStream.CopyTo(resultStream);
                return resultStream.ToArray();
            }
        }

        private static JObject ConvertToJson(byte[] data)
        {
            JObject jObj;
            using (var ms = new MemoryStream(data))
            using (var streamReader = new StreamReader(ms))
            using (var jsonReader = new JsonTextReader(streamReader))
            {
                jObj = (JObject)JToken.ReadFrom(jsonReader);
            }
            return jObj;
        }
    }
于 2021-03-02T20:54:54.530 回答