我是 Java 8 Stream API 的新手,在以下场景中使用它时遇到了问题:
我必须逐行读取文件并以它们的大小最接近某个字符限制的方式对行进行分组,然后将其发布到 Kafka。
public void publishStringToKafka(File outputFile) {
try {
Files.lines(outputFile.toPath())
.forEach(s -> kafkaProducer.publishMessageOnTopic(s, KAFKA_INGESTION_TOPIC));
} catch (IOException e) {
LOG.error("Could not read buffered file to send message on kafka.", e);
} finally {
try {
Files.deleteIfExists(outputFile.toPath());
} catch (IOException e) {
LOG.error("Problem in deleting the buffered file {}.", outputFile.getName(), e);
}
}
}
现在我完全习惯于使用传统或声明式风格,即逐行读取文件,使用循环组合它们,并在大小最接近 1024 个字符时继续在 kafka 上发布消息。但我想为此使用流。
注意:我在这段代码中遇到了另一个问题,即Files.deleteIfExists(outputFile.toPath());
命令在执行后不会删除文件,也不会发生异常。而如果我使用声明式样式,则文件将被成功删除。
请帮忙。