我有 2 个带有排序数据的 csv 文件:文件 1:排序的数字(~1GB) 文件 2:排序的数字 + 额外数据(~20GB)
我需要在文件 2 中查找文件 1 中的所有数字并进行一些处理(跳过文件 2 中不存在于文件 1 中的数字)。
到目前为止,我有:
object MainQueue extends IOApp {
override def run(args: List[String]): IO[ExitCode] =
program[IO].compile.drain.as(ExitCode.Success)
def program[F[_]: Sync: ContextShift](): Stream[F, Unit] =
for {
number <- numberStream
record <- records
.through(parser())
.through(findRecord(number))
_ <- Stream.emit(println(s"$number <-> $record"))
} yield ()
def findRecord[F[_]](phone: Long): Pipe[F, Long, Long] =
_.dropWhile(r => {
println(s"Reading $r")
r < phone
}).head //halts the stream
def numberStream[F[_]](): Stream[F, Long] =
Stream(100L, 120L)
//TODO: make stream continue and not halt and restart
def records[F[_]: Sync: ContextShift](): Stream[F, String] =
Stream
.resource(Blocker[F])
.flatMap { bec =>
readAll[F](Paths.get("small.csv"), bec, 4096)
}
.through(text.utf8Decode)
.through(text.lines)
def parser[F[_]](): Pipe[F, String, Long] = ??? //parse
def writer[F[_]](): Pipe[F, Long, Unit] =
_.map(v => {
println(s"Found: $v")
})
}
哪个打印:
Reading 50
Reading 100
100 <-> 100
Reading 50
Reading 100
Reading 120
120 <-> 120
这意味着第二个流为文件 1 中的每个值重新启动,我如何保持上次读取的位置并从那里开始?数字是排序的,所以没有点重新开始。我对 scala 和 fs2 非常陌生,因此我将不胜感激解释我的误解。
谢谢!