2

我有一个具有不同复杂性(过程持续时间)的 TxnTypes 列表。
我想TxnType从列表中找到匹配的。
我尝试通过混合流的并行处理和短路过滤器功能来实现它。
但我注意到它们没有混合在一起。
我写了下面的示例。但注意到并联和短路的混合无法正常工作。
每次运行都显示并行处理工作,但在找到项目后不会终止!!!

    class TxnType {
        public String id;   
        public TxnType(String id) {this.id = id;}
       
        public boolean match(String entry) {
            Date s = new Date();
            // simulate long processing match time TxnType
            if (id.equals("1")) {
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Date f = new Date();
            System.out.println("check id = " + id+ "  duration = "+(f.getTime()- s.getTime()));
    
            return id.equalsIgnoreCase(entry);
        }
    }

     private void test4() {
        // build list of available TxnTypes
        ArrayList<TxnType> lst = new ArrayList<>();
        lst.add(new TxnType("0"));
        lst.add(new TxnType("1"));  // long match processing time type
        lst.add(new TxnType("2"));
        lst.add(new TxnType("3"));
        lst.add(new TxnType("4"));

        String searchFor = "3";
        System.out.println("searchFor = " + searchFor);
        Date st, fi;

        st = new Date();
        Optional<TxnType> found2 =lst.stream().parallel().filter(txnType->txnType.match(searchFor)).findFirst();
            System.out.println("found.stream().count() = " + found2.stream().count());
            fi= new Date();
            System.out.println("dur="+ (fi.getTime()- st.getTime()));
    }

多次运行,发现处理没有尽快结束,等待全部处理!!!!

searchFor = 3
check id = 4  duration = 0
check id = 2  duration = 0
check id = 3  duration = 0
check id = 0  duration = 0
check id = 1  duration = 4005
found.stream().count() = 1
dur=4050

有没有类似的东西FilterFindFirst()

4

3 回答 3

1

您的错误是使用findFirst,而不是findAny.

请注意,它1是在您希望找到的元素之前排序的 ( 3)。所以它必须先完成检查1,然后才能得出结论“3是与谓词匹配的第一个元素”,即使它们是并行完成的。如果它找到了3,并且还没有开始检查列表中的其他内容,那么它将不会开始。这就是短路的findFirst意思。

findAny另一方面,不关心订单。如果它找到任何满足谓词的元素,它就不再开始检查任何新的东西。

现在,即使您更改为findAny,您可能仍然会发现需要 4 秒才能完成。这是因为与流管道可以创建的线程数相比,列表中的元素太少。所以对所有元素的处理都开始了,一旦开始就不会中断,即使已经找到了满足谓词的元素。

如果您将更多元素放入列表中:

for (int i = 0 ; i < 100 ; i++) {
    lst.add(new TxnType("foo"));
}

...

Optional<TxnType> found2 = lst.parallelStream().filter(txnType -> txnType.match(searchFor)).findAny();

然后在处理完成1之前开始处理的可能性较小3,并且您将获得更快的运行。但这不会每次都发生。不能保证1之前不会得到处理3

基本上,短路工作正常。就是这样

  • findFirst不会像您希望的那样剧烈地短路
  • 您的列表中的元素太少,而您的计算机有足够的内核来一次处理它们,所以确实如此。
于 2021-06-26T08:09:38.303 回答
1

如果您查看FindOps.FindTask.doLeaf的源代码,您会看到,首先执行操作,然后检查是否找到结果。

假设有 4 个核心,列表中的一个元素将被传递给每个线程。一旦一个线程完成一个元素的执行,它就会变得空闲并选择下一个可用元素(如果有)。在您的情况下,很有可能在找到匹配项之前选择了列表的第 5 个元素来执行。

如果您尝试使用更长的列表,您可能会看到短路在起作用。

于 2021-06-26T08:10:11.670 回答
1

1 - 一个包含 5 个元素的列表太小了,可能每个线程只处理一个元素,最多 2 个

2 - 由于第二个元素是花费更多时间的元素,它很可能会在任何其他检查完成之前开始,也就是说,在找到任何元素之前 - 一旦match()方法启动,它就不会被中断

尝试更多元素:50 甚至 100 1
将长期运行的元素更改为第 10 个或更高版本。
为了好玩,还添加Thread.currentThread().getId()match().

你会看到

  • 即使使用2,也不总是启动长时间运行的线程findAny()
  • 并非所有元素都被检查

3 - 只是一个说明:奇怪的看到Stream混合Date(我更喜欢使用java.time类)


1 - 用 50 个元素测试自己,"9"是慢的,搜索"15"(或"3"
2 - 我看不到任何相关的差异(如果只使用 5 个元素,则没有)findAny()而不是findFirst()(与我的第一个猜测相反)
检查的顺序是两者都随机,有时findAny()运行慢线程而findFirst()没有

于 2021-06-26T08:54:37.473 回答