我在创建“背压系统”时遇到了问题。我正在使用 Vertx HttpClient 和 RxJava。我需要向外部服务发出 6000 个请求,并且为了避免 waitForQueue 中的满,由于这个外部服务不能像我发送的那样快速处理,所以我在请求/响应之间设置了延迟。
由于此旅程是作为批处理过程进行的,因此无需担心需要一分钟。
这是我的代码
return from(subGroups)
.flatMap(subGroup -> getProductIdsForSubGroup(subGroup))
.delay(50, TimeUnit.MILLISECONDS)
此方法从每 24 小时运行一次的 Observable 间隔调用,并通过此子组列表(6000)
但是在检查我的日志后,我看不到我的请求之间有 50 毫秒的延迟
这是我的 3 条日志
{"@timestamp":"2016-11-30T10:32:48.973+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/comercial?category=T15EB&clientId=ERROR_NOT_SUPPLIED","requestHash":189630582,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.978+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EE&clientId=ERROR_NOT_SUPPLIED","requestHash":1296199359,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.981+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EG&clientId=ERROR_NOT_SUPPLIED","requestHash":228306365,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
知道我需要做什么才能实现这一目标吗?
问候。
解决方案
我最终使用concatMap
Please 如果您有更好的解决方案,请告诉我
return from(subGroups)
.concatMap(subGroup -> Observable.just(subGroup).delay(50, TimeUnit.MILLISECONDS))
.flatMap(subGroup -> getProductIdsForSubGroup(subGroup))