3

在以下scalaz-stream(取自文档)的使用示例中,如果输入和/或输出是 gzip 压缩文件,我需要更改什么?换句话说,我该如何使用compress

import scalaz.stream._
import scalaz.concurrent.Task

val converter: Task[Unit] =
  io.linesR("testdata/fahrenheit.txt")
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .pipe(text.utf8Encode)
    .to(io.fileChunkW("testdata/celsius.txt"))
    .run

// at the end of the universe...
val u: Unit = converter.run
4

1 回答 1

5

压缩输出很容易。由于compress.deflate()is aProcess1[ByteVector, ByteVector]您需要将其插入您正在发出ByteVectors 的管道中(即紧随其后text.utf8Encode的是 a Process1[String, ByteVector]):

val converter: Task[Unit] =
  io.linesR("testdata/fahrenheit.txt")
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .pipe(text.utf8Encode)
    .pipe(compress.deflate())
    .to(io.fileChunkW("testdata/celsius.zip"))
    .run

因为inflate你不能io.linesR用来读取压缩文件。您需要一个生成ByteVectors 而不是Strings 的进程,以便将它们通过管道传输到inflate. (您可以使用io.fileChunkR它。)下一步是将未压缩的数据解码为Strings (text.utf8Decode例如),然后text.lines()用于逐行发出文本。这样的事情应该可以解决问题:

val converter: Task[Unit] =
  Process.constant(4096).toSource
    .through(io.fileChunkR("testdata/fahrenheit.zip"))
    .pipe(compress.inflate())
    .pipe(text.utf8Decode)
    .pipe(text.lines())
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .pipe(text.utf8Encode)
    .to(io.fileChunkW("testdata/celsius.txt"))
    .run
于 2015-05-04T19:59:09.973 回答