1

我有 2 个集合,一个是“列表”,另一个是“pairRdd2”,其中包含如下所述的数据。

如果 mypairRdd2 包含列表中提到的所有值,我正在尝试使用 containsAll 应用过滤器。预期结果是 joe,{US,UK}

List<String> list = Arrays.asList("US","UK");

JavaRDD pairRdd = ctx.parallelize(Arrays.asList(new Tuple2("john","US"),new Tuple2("john","UAE"),new Tuple2("joe","US"),new Tuple2("joe","UK")));

JavaPairRDD<String, String> pairRdd2 = JavaPairRDD.fromJavaRDD(pairRdd);

pairRdd2.groupByKey().filter(x-> Arrays.asList(x._2).containsAll(list)).foreach(new VoidFunction<Tuple2<String,Iterable<String>>>() {

    @Override
    public void call(Tuple2<String, Iterable<String>> t) throws Exception {
        System.out.println(t._1());             
    }
});

有人可以强调我做错了什么...

4

1 回答 1

1

问题出在Arrays.asList(). 这将创建一个Iterables 列表,这不是您执行过滤器所需的。您应该使用groupBy自己给出的列表:

    pairRdd2.groupByKey().filter(f -> {
        Set<String> set = new HashSet<>();
        for(String s: f._2())
            set.add(s);

        return list.containsAll(set);
    });

您还可以找到一种将可迭代/迭代器转换为集合并完全避免循环的快速方法。

于 2018-02-20T12:41:28.353 回答