2

我在创建“背压系统”时遇到了问题。我正在使用 Vertx HttpClient 和 RxJava。我需要向外部服务发出 6000 个请求,并且为了避免 waitForQueue 中的满,由于这个外部服务不能像我发送的那样快速处理,所以我在请求/响应之间设置了延迟。

由于此旅程是作为批处理过程进行的,因此无需担心需要一分钟。

这是我的代码

return from(subGroups)
        .flatMap(subGroup -> getProductIdsForSubGroup(subGroup))
        .delay(50, TimeUnit.MILLISECONDS)

此方法从每 24 小时运行一次的 Observable 间隔调用,并通过此子组列表(6000)

但是在检查我的日志后,我看不到我的请求之间有 50 毫秒的延迟

这是我的 3 条日志

{"@timestamp":"2016-11-30T10:32:48.973+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/comercial?category=T15EB&clientId=ERROR_NOT_SUPPLIED","requestHash":189630582,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.978+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EE&clientId=ERROR_NOT_SUPPLIED","requestHash":1296199359,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.981+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EG&clientId=ERROR_NOT_SUPPLIED","requestHash":228306365,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}

知道我需要做什么才能实现这一目标吗?

问候。

解决方案

我最终使用concatMapPlease 如果您有更好的解决方案,请告诉我

return from(subGroups)
        .concatMap(subGroup -> Observable.just(subGroup).delay(50, TimeUnit.MILLISECONDS))
        .flatMap(subGroup -> getProductIdsForSubGroup(subGroup))
4

1 回答 1

1

请注意,这delay只会延迟发射,因此基本上是在浪费时间。

如果您可以使用最多 10 个同时请求/连接来查询远程系统,则可以使用 2-parameter flatMap

return from(subGroups)
    .flatMap(subGroup -> getProductIdsForSubGroup(subGroup), 10);
于 2016-11-30T15:55:37.180 回答