0

我有传入的处理请求,由于耗尽共享资源,我不希望同时处理太多。我也希望共享一些唯一键的请求不要同时执行:

def process(request: Request): Observable[Answer] = ???

requestsStream
  .groupBy(request => request.key)
  .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
      requestsForKey
         .flatMap(1, process)
  })

但是,上述方法不起作用,因为每个键的可观察对象永远不会完成。实现这一目标的正确方法是什么?

什么不起作用:

  .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
      // Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently
      requestsForKey.take(1)
         .flatMap(1, process)
  })

 .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
      // The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing
      // This discards all requests after the first if processing takes more than 100 milliseconds
      requestsForKey.timeout(100.millis, Observable.empty)
         .flatMap(1, process)
  })
4

1 回答 1

1

以下是我设法实现这一目标的方法。对于每个唯一键,我分配了专用的单线程调度程序(以便按顺序处理具有相同键的消息):

@Test
public void groupBy() throws InterruptedException {
    final int NUM_GROUPS = 10;
    Observable.interval(1, TimeUnit.MILLISECONDS)
            .map(v -> {
                logger.info("received {}", v);
                return v;
            })
            .groupBy(v -> v % NUM_GROUPS)
            .flatMap(grouped -> {
                long key = grouped.getKey();
                logger.info("selecting scheduler for key {}", key);
                return grouped
                        .observeOn(assignScheduler(key))
                        .map(v -> {
                            String threadName = Thread.currentThread().getName();
                            Assert.assertEquals("proc-" + key, threadName);
                            logger.info("processing {} on {}", v, threadName);
                            return v;
                        })
                        .observeOn(Schedulers.single()); // re-schedule
            })
            .subscribe(v -> logger.info("got {}", v));

    Thread.sleep(1000);
}

在我的情况下,键的数量(NUM_GROUPS)很小,所以我为每个键创建了专用的调度程序:

Scheduler assignScheduler(long key) {
    return Schedulers.from(Executors.newSingleThreadExecutor(
        r -> new Thread(r, "proc-" + key)));
}

如果键的数量是无限的或太大而无法为每个键分配一个线程,您可以创建一个调度程序池并像这样重用它们:

Scheduler assignScheduler(long key) {
    // assign randomly
    return poolOfSchedulers[random.nextInt(SIZE_OF_POOL)];
}
于 2017-01-17T12:35:14.127 回答