1

我们需要在庞大的数据集上以动态方式按多个字段进行分组。数据存储在 Hazelcast Jet 集群中。示例:如果Person类包含 4 个字段:agename和。我们首先需要按城市分组,然后按国家分组,然后我们可以根据条件参数按名称分组。citycountry

我们已经尝试过使用分布式收集但无法正常工作。即使我们尝试使用 Pipeline API,它也会抛出错误。

代码:

    IMap res= client.getMap("res"); // res is distrbuted map
    Pipeline p = Pipeline.create();
    JobConfig jobConfig = new JobConfig();
    p.drawFrom(Sources.<Person>list("inputList"))
     .aggregate(AggregateOperations.groupingBy(Person::getCountry))
     .drainTo(Sinks.map(res));      
    jobConfig = new JobConfig();
    jobConfig.addClass(Person.class);
    jobConfig.addClass(HzJetListClientPersonMultipleGroupBy.class);
    Job job = client.newJob(p, jobConfig);
    job.join();

然后我们在客户端读取地图并销毁它。

服务器上的错误消息:

原因:java.lang.ClassCastException:java.util.HashMap 无法转换为 java.util.Map$Entry

4

1 回答 1

4

groupingBy将所有输入项聚合到HashMap使用给定函数提取键的位置。在您的情况下,它将项目流聚合Person为单个HashMap<String, List<Person>>项目。

你需要使用这个:

        p.drawFrom(Sources.<Person>list("inputList"))
         .groupingKey(Person::getCountry)
         .aggregate(AggregateOperations.toList())
         .drainTo(Sinks.map(res));

res这将使用每个城市的人员列表填充地图。

请记住,没有groupingKey()聚合总是全局的。也就是说,输入中的所有项目都将聚合为一个输出项目。

于 2019-07-01T06:48:24.530 回答