所以目前我有一个带有阻塞队列的 Executor 实现,具体的实现就像,我有每个请求的项目列表,我将它们划分为分区,然后计算每个分区,最后将它们连接起来以获得最终列表。
如何在 LMAX 中实现它?我看到一旦我有分区并将它们推入 RingBuffer,每个分区都被视为单独的项目,所以我自定义加入它们。就像是,
ConcurrentHashMap<Long, LongAdder> map = new ConcurrentHashMap<>();
@Override
public List<SomeTask> score(final List<SomeTask> tasks) {
long id = tasks.get(0).id;
map.put(id, new LongAdder());
for (SomeTask task : tasks) {
producer.onData(task);
}
while (map.get(id).intValue() != tasks.size()) ;
map.remove(id);
return tasks;
}
有干净的方法吗?我专门查看了https://github.com/LMAX-Exchange/disruptor/tree/master/src/test/java/com/lmax/disruptor/example和 KeyedBatching,但它们似乎在一个线程上进行批处理和执行。
目前对我来说,每个分区占用大约 200 毫秒,我想并行执行它们。
任何帮助是极大的赞赏。