您可能想玩弄observe
. 我确信有更好的方法来做到这一点,但这里有一个例子可以帮助你摆脱困境:
您编译和运行的原始代码:
import fs2.io
import cats.effect.{IO, ContextShift}
import concurrent.ExecutionContext.Implicits.global
import java.net.URL
import java.nio.file.Paths
object Example1 {
implicit val contextShift: ContextShift[IO] = IO.contextShift(global)
def download(spec: String, filename: String): fs2.Stream[IO, Unit] =
io.readInputStream[IO](IO(new URL(spec).openConnection.getInputStream), 4096, global, closeAfterUse=true)
.through(io.file.writeAll(Paths.get(filename), global))
def main(args: Array[String]): Unit = {
download("https://isitchristmas.com/", "/tmp/christmas.txt")
.compile.drain.unsafeRunSync()
}
}
使用观察计数字节:
import fs2.io
import cats.effect.{IO, ContextShift}
import concurrent.ExecutionContext.Implicits.global
import java.net.URL
import java.nio.file.Paths
object Example2 {
implicit val contextShift: ContextShift[IO] = IO.contextShift(global)
final case class DlResults(bytes: Long)
def download(spec: String, filename: String): fs2.Stream[IO, DlResults] =
io.readInputStream[IO](IO(new URL(spec).openConnection.getInputStream), 4096, global, closeAfterUse = true)
.observe(io.file.writeAll(Paths.get(filename), global))
.fold(DlResults(0L)) { (r, _) => DlResults(r.bytes + 1) }
def main(args: Array[String]): Unit = {
download("https://isitchristmas.com/", "/tmp/christmas.txt")
.compile
.fold(()){ (_, r) => println(r)}
.unsafeRunSync()
}
}
输出:
> DlResults(42668)