4

如何使用fs2filtered从当前流文件中删除行并获取过滤行数作为返回类型?

例如:如果old.txt包含由换行符 (\n) 分隔的字符串:

 john
 sam
 chen
 yval
 ....

val myList = List("chen","yval")

def converter[F[_]](implicit F: Sync[F]): F[Unit] =
  io.file.readAll[F](Paths.get("testdata/old.txt"), 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    .filter(s => myList.contains(s))//remove this from the old file and write to new file
    .intersperse("\n")
    .through(text.utf8Encode)
    .through(io.file.writeAll(Paths.get("testdata/new.txt")))
    .compile.drain

// at the end of the universe...
val u: Unit = converter[IO].unsafeRunSync()
4

1 回答 1

3

您可以使用类的观察方法Stream

您正在寻找一个函数def converter[F[_]: Sync]: F[Int],它产生一个计算F[Int],其结果(类型Int)是过滤的行数,其效果是将这些行写入输出文件。以管道类比为例,您希望将过滤后的流提供给两个输出,一个用于结果,一个用于效果。您可以使用函数observe来做到这一点,定义为

def observe(sink: Sink[F, O])(implicit F: Effect[F], ec: ExecutionContext): Stream[F, O] 

ASink[F,O]是函数的别名Stream[F, O] => Stream[F, Unit]。在您的情况下,接收是将过滤后的流写入输出文件的代码的一部分:

def writeToFile[F[_]: Sync]: Sink[F, String] = 
  _.intersperse("\n")
  .through(text.utf8Encode)
  .through(io.file.writeAll(Paths.get("testdata/new.txt")))

另一个输出是减少,或者更确切地说折叠,

  def converter[F[_]](implicit F: Effect[F], ec: ExecutionContext): F[Int] = 
    io.file.readAll[F](Paths.get("testdata/old.txt"), 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => myList.contains(s))
      .observe(writeToFile[F])
      .compile
      .fold[Int](0)( (c, _) => c+1)
}

注意:对于此解决方案,您需要将 type-class 限制F为 be Effect,并且您需要使用ExecutionContext. fold是在类中定义的ToEffect

于 2018-07-12T23:23:38.530 回答