我有传入的处理请求,由于耗尽共享资源,我不希望同时处理太多。我也希望共享一些唯一键的请求不要同时执行:
def process(request: Request): Observable[Answer] = ???
requestsStream
.groupBy(request => request.key)
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
requestsForKey
.flatMap(1, process)
})
但是,上述方法不起作用,因为每个键的可观察对象永远不会完成。实现这一目标的正确方法是什么?
什么不起作用:
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently
requestsForKey.take(1)
.flatMap(1, process)
})
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing
// This discards all requests after the first if processing takes more than 100 milliseconds
requestsForKey.timeout(100.millis, Observable.empty)
.flatMap(1, process)
})