我注意到,如果我使用 StreamEx lib 将我的流与自定义 ForkJoinPool 并行输出,如下所示 - 后续操作确实在该池的并行线程中运行。但是,如果我添加一个 map() 操作并并行生成流 - 只使用池中的一个线程。
下面是演示此问题的最小工作示例的完整代码(没有所有导入)。executeAsParallelFromList() 和 executeAsParallelAfterMap() 方法之间的唯一区别是在 .parallel() 之前添加了 .map(...) 调用。
import one.util.streamex.StreamEx;
public class ParallelExample {
private static final Logger logger = LoggerFactory.getLogger(ParallelExample.class);
private static ForkJoinPool s3ThreadPool = new ForkJoinPool(3);
public static List<String> getTestList(){
int listSize = 10;
List<String> testList = new ArrayList<>();
for (int i=0; i<listSize; i++)
testList.add("item_" + i);
return testList;
}
public static void executeAsParallelFromList(){
logger.info("executeAsParallelFromList():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
public static void executeAsParallelAfterMap(){
logger.info("executeAsParallelAfterMap():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.map(item -> item+"_mapped")
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
private static void handleItem(String item){
// do something with the item - just print for now
logger.info("I'm handling item: {}", item);
}
}
执行这两种方法的单元测试:
public class ParallelExampleTest {
@Test
public void testExecuteAsParallelFromList() {
ParallelExample.executeAsParallelFromList();
}
@Test
public void testExecuteAsParallelFromStreamEx() {
ParallelExample.executeAsParallelAfterMap();
}
}
执行结果:
08:49:12.992 [main] INFO marina.streams.ParallelExample - executeAsParallelFromList():
08:49:13.002 [main] INFO marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.040 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_6
08:49:13.040 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_2
08:49:13.040 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_1
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_4
08:49:13.041 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_8
08:49:13.041 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_0
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_3
08:49:13.041 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_9
08:49:13.041 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_5
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_7
08:49:13.043 [main] INFO marina.streams.ParallelExample - executeAsParallelAfterMap():
08:49:13.043 [main] INFO marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_0_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_1_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_2_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_3_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_4_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_5_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_6_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_7_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_8_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_9_mapped
如您所见,执行 executeAsParallelFromList() 时使用了所有三个线程,但执行 executeAsParallelAfterMap() 时只使用了一个线程。
为什么?
谢谢!
码头
注意:这个例子是故意简单化的——我试图让它尽可能少地演示这个问题。显然,在现实生活中,map()、handleItem() 等还有很多事情要做,而且输入数据更有趣(我正在尝试并行处理 AWS S3 存储桶/前缀)。