我正在使用 Alpakka 和 Akka 处理 CSV 文件。由于我有一堆 CSV 文件必须添加到同一个流中,因此我想添加一个包含来自文件名或请求的信息的字段。目前我有这样的事情:
val source = FileIO.fromPath(Paths.get("10002070.csv"))
.via(CsvParsing.lineScanner())
它流式传输 ByteStrings(字段)的列表(行)序列。目标是这样的:
val filename = "10002070.csv"
val source = FileIO.fromPath(Path.get(filename))
.via(CsvParsing.lineScanner())
.via(AddCSVFieldHere(filename))
创建类似于以下的结构:
10002070.csv,max,estimated,12,1,0
其中文件名是原始源中不存在的字段。
我认为在流中注入值看起来不太漂亮,而且最终我想确定在读取目录的流阶段传递给解析的文件名。
通过流阶段传递值以供以后重用的正确/规范方法是什么?