2

我是 Java 8 Stream API 的新手,在以下场景中使用它时遇到了问题:

我必须逐行读取文件并以它们的大小最接近某个字符限制的方式对行进行分组,然后将其发布到 Kafka。

    public void publishStringToKafka(File outputFile) {
        try {
            Files.lines(outputFile.toPath())
                    .forEach(s -> kafkaProducer.publishMessageOnTopic(s, KAFKA_INGESTION_TOPIC));
        } catch (IOException e) {
            LOG.error("Could not read buffered file to send message on kafka.", e);
        } finally {
            try {
                Files.deleteIfExists(outputFile.toPath());
            } catch (IOException e) {
                LOG.error("Problem in deleting the buffered file {}.", outputFile.getName(), e);
            }
        }
    } 

现在我完全习惯于使用传统或声明式风格,即逐行读取文件,使用循环组合它们,并在大小最接近 1024 个字符时继续在 kafka 上发布消息。但我想为此使用流。

注意:我在这段代码中遇到了另一个问题,即Files.deleteIfExists(outputFile.toPath());命令在执行后不会删除文件,也不会发生异常。而如果我使用声明式样式,则文件将被成功删除。

请帮忙。

4

2 回答 2

2

在这种Collectors.groupingBy()情况下将很有用。

Map<T, List<String>> result = Files.lines(outputFile.toPath())
  .collect(Collectors.groupingBy(Your::classifier, Collectors.toList()))

结果,您得到一个Map<T,List<String>>. T是 Your::classifier 返回的类型。现在您已经将所有内容分组并可以继续进行 for-each。

现在您可以提取条目集、对其进行排序、对其进行平面映射并发布到 Kafka。flatMap是必要的,因为如果你不扁平化你的结构,你最终会迭代Stream<List<>>. 这不一定是一件坏事,但我认为这不是理想的情况。

 collect.entrySet().stream()
   .sorted(Comparator.comparing(Map.Entry::getKey))
   .flatMap(e -> e.getValue().stream())
   .forEach(s -> kafkaProducer.publishMessageOnTopic(s, KAFKA_INGESTION_TOPIC));

唯一棘手的部分是适当地实现分类器方法,但从我了解到的问题中,您知道如何做到这一点。

于 2016-11-17T06:15:00.633 回答
2

问题陈述您想要做的是将流中的所有字符串按顺序组合到尽可能接近最大数量的字符并创建它们的新列表。然后可以使用这个新创建的列表流式传输到 Kafka。这不是一个容易解决的问题,因为您必须处理状态。

解决方案

使用 aCollector来累积值

 List<String> result = someStrings.stream()
                                  .collect(ArrayList::new, (list, string) -> accumulate(list, string), List::addAll);

accumulate方法包含最大字符逻辑:

 private void accumulate(ArrayList<String> list, String string) {
        if (list.isEmpty() || list.get(list.size() -1).length() + string.length() > MAXIMUM_CHARACTERS){
            list.add(string);
        } else {
            list.set(list.size()-1, list.get(list.size()-1) + string);
        }
    }

如果您输入列表 [as, 1234, 213, bd, de] 并将 MAXIMUM_CHARACTERS 设置为 5,它将返回所需的输出 [as, 1234, 213bd, de]。

于 2016-11-17T07:55:54.790 回答