0

我正在使用 Alpakka 和 Akka 处理 CSV 文件。由于我有一堆 CSV 文件必须添加到同一个流中,因此我想添加一个包含来自文件名或请求的信息的字段。目前我有这样的事情:

val source = FileIO.fromPath(Paths.get("10002070.csv"))
  .via(CsvParsing.lineScanner())

它流式传输 ByteStrings(字段)的列表(行)序列。目标是这样的:

val filename = "10002070.csv"
val source = FileIO.fromPath(Path.get(filename))
    .via(CsvParsing.lineScanner())
    .via(AddCSVFieldHere(filename))

创建类似于以下的结构:

10002070.csv,max,estimated,12,1,0

其中文件名是原始源中不存在的字段。

我认为在流中注入值看起来不太漂亮,而且最终我想确定在读取目录的流阶段传递给解析的文件名。

通过流阶段传递值以供以后重用的正确/规范方法是什么?

4

1 回答 1

1

您可以转换流map以将文件名添加到每个List[ByteString]

val fileName = "10002070.csv"
val source =
  FileIO.fromPath(Path.get(fileName))
    .via(CsvParsing.lineScanner())
    .map(List(ByteString(fileName)) ++ _)

例如:

Source.single(ByteString("""header1,header2,header3
                           |1,2,3
                           |4,5,6""".stripMargin))
  .via(CsvParsing.lineScanner())
  .map(List(ByteString("myfile.csv")) ++ _)
  .runForeach(row => println(row.map(_.utf8String)))

// The above code prints the following:
// List(myfile.csv, header1, header2, header3)
// List(myfile.csv, 1, 2, 3)
// List(myfile.csv, 4, 5, 6)

相同的方法适用于您不知道文件名的更一般情况。如果您想读取目录中的所有文件(假设所有这些文件都是 csv 文件),将文件连接到单个流中,并在每个流元素中保留文件名,那么您可以使用 Alpakka 的Directory实用程序在以下方式:

val source =
  Directory.ls(Paths.get("/my/dir")) // Source[Path, NotUsed]
    .flatMapConcat { path =>
       FileIO.fromPath(path)
         .via(CsvParsing.lineScanner())
         .map(List(ByteString(path.getFileName.toString)) ++ _)
    }
于 2018-03-26T19:58:06.387 回答