-1

我希望根据某些字段值确保并行流中的顺序。也许我会介绍一些抽象的例子:

假设我们有类 User

@Lombok.Data()
class User {
   private String firstName;
   private String lastName;
   private int someValue;
   private int priority;
}

我们有这些用户的列表:

List<User> users = someInitUsersFunction();

我想强制并行流按优先级处理每个用户,假设我们有 100 个优先级为 0 的用户、100 个优先级为 1 的用户和 100 个优先级为 2 的用户。

我只想在优先级 1 完成和优先级 0 完成时启动具有优先级 2 的进程用户。

我认为

mvn 安装 -T 4

可能是我正在寻找的方法(第一个构建独立模块)。可以在java流中做到这一点吗?也可以使用替代品。

我的方法是按优先级划分到特定列表,然后按列表处理列表

4

3 回答 3

1

分块按优先级处理用户,但并行处理相同优先级的用户,先按优先级对用户进行分组,然后分别处理每个组。

users.stream().collect(Collectors.groupingBy(User::getPriority, TreeMap::new, Collectors.toList()))
        .values().stream()
        .forEachOrdered(list -> // sequential, in priority order
            list.parallelStream().forEach(user -> { // parallel, unordered
                // process user here
            }));

没有嵌套流,并为清楚起见进行了注释:

// Group users by priority
TreeMap<Integer, List<User>> usersByPriority = users.stream()
        .collect(Collectors.groupingBy(User::getPriority, TreeMap::new, Collectors.toList()));

// Process groups in priority order
for (List<User> list : usersByPriority.values()) {

    // Process users of current priority in parallel
    list.parallelStream().forEach(user -> {
        // process user here
    });

    // We won't loop to next priority until all users with current priority has been processed
}
于 2020-04-16T12:18:30.190 回答
0

Sequential vs parallel不一样ordering

如果你有一个有序的流并做一些保证保持顺序的操作,那么流是并行处理还是顺序处理并不重要;执行必须保持秩序。

在您的情况下(仅当您的列表不尊重您想要的顺序时..)您可以使用特定的Comparator或让 基于字段的User实现。然后在开始执行其他流操作之前对列表进行排序。并行或顺序不会给出不同的结果Comparablepriority

或使用特定的集合类型,如SortedSetPriorityQueue

关于 iterator() 方法中提供的 Iterator 的一些注意事项PriorityQueue不能保证以任何特定顺序遍历 PriorityBlockingQueue 的元素。

因此,您必须在流式传输期间对元素进行排序以保持顺序->stream().sorted()或简单地poll对其使用方法。

于 2020-04-16T11:51:24.220 回答
0

如果要1优先处理前100个,然后是后100个,依此类推,它既不是并行也不是并发,而是实际上是顺序的。这些部分子列表可以并行处理。一个PriotityQueue或者SortedMap是要走的路。

排序地图:

使用方法TreeSet内部的实现Collectors.groupingBy

Map<Integer, List<User>> map = users.stream()
    .collect(Collectors.groupingBy(
         User::getPriority,
         TreeMap::new,
         Collectors.toList()));

地图按优先级(键)排序。

优先队列:

  • Group by User::getPrioritytoMap<Integer, List<User>>按优先级对子列表进行分组
  • PriorityQueue按优先级添加到比较中
  • 并行处理顺序轮询的列表

从分组开始:

Map<Integer, List<User>> map = users.stream().collect(Collectors.groupingBy(User::getPriority);

此时,地图将如下所示:

[User(firstName=C, lastName=C, someValue=0, priority=1)]
[User(firstName=A, lastName=A, someValue=0, priority=2)]
[User(firstName=B, lastName=B, someValue=0, priority=3), User(firstName=D, lastName=D, someValue=0, priority=3)]

PriorityQueue从地图创建:

 Queue<List<User>> queue = map.entrySet()
        .stream()
        .collect(
            () -> new PriorityQueue<>(Comparator.comparingInt(list -> list.get(0).getPriority())),
            (pq, entry) -> pq.add(entry.getValue()),
            AbstractQueue::addAll);

遍历队列尊重优先级,并且子集可能会被并行处理,因为它们具有相同的优先级。

for (List<User> users : queue) {
    users.stream().parallel()...
}
于 2020-04-16T12:24:48.967 回答