6

上下文:我正在尝试编写一个Process1[ByteVector, spray.http.HttpResponsePart]with output ChunkedResponseStart(bytes), MessageChunk(bytes), MessageChunk(bytes), ..., ChunkedResponseEnd。我还没有完全理解 scalaz-stream 及其词汇。

如何编写一个可以以n不同方式处理第一个块的进程?

我想出了这个(以字符串为例):

val headerChunk = process1.chunk[String](5).map(_.reduce(_ + _))

val headerChunkAndRest: Process1[String, String] =
  headerChunk.take(1) ++ process1.id

io.linesR(Files.newInputStream(Paths.get("testdata/fahrenheit.txt")))
  .pipe(headerChunkAndRest)
  .to(io.stdOutLines)
  .run.run

什么是惯用的并且可能是通常可组合的写作方式headerChunkAndRest

4

1 回答 1

4

一般注意事项

有几种方法可以做到这一点,很大程度上取决于您的需求细节。您可以使用以下属于 scalaz-streams 的辅助方法:

  1. foldWithIndex这为您提供了块的当前索引作为数字。您可以根据该索引进行区分
  2. zipWithState您可以将方法的一次调用中的状态添加到下一次调用中,并使用此状态来跟踪您是否仍在解析标头或是否已到达正文。在下一步中,您可以使用此状态来处理不同的标题和正文
  3. repartition使用它可以将所有标题和所有正文元素组合在一起。然后,您可以在下一步中处理它们。
  4. zipWithNext此功能始终向您显示与当前元素分组的前一个元素。您可以使用它来检测何时从标题切换到正文并做出相应反应。

可能你应该重新思考,你真正需要什么。对于您的问题,它将是zipwithIndexand then map。但是如果你重新考虑你的问题,你可能会以repartitionor结束zipWithState

示例代码

让我们举一个简单的例子:一个 HTTP 客户端,它将 HTTP 标头元素与正文(HTTP,而不是 HTML)分开。在标头中是 cookie 之类的东西,在正文中是真正的“内容”,例如图像或 HTTP 源。

一个简单的 HTTP 客户端可能如下所示:

import scalaz.stream._
import scalaz.concurrent.Task
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup

implicit val AG = nio.DefaultAsynchronousChannelGroup

def httpGetRequest(hostname : String, path : String = "/"): Process[Nothing, String] =
  Process(
    s"GET $path HTTP/1.1",
    s"Host: $hostname",
    "Accept: */*",
    "User-Agent: scalaz-stream"
  ).intersperse("\n").append(Process("\n\n"))

def simpleHttpClient(hostname : String, port : Int = 80, path : String = "/")(implicit AG: AsynchronousChannelGroup) : Process[Task, String] =
  nio.connect(new InetSocketAddress(hostname, port)).flatMap(_.run(httpGetRequest(hostname, path).pipe(text.utf8Encode))).pipe(text.utf8Decode).pipe(text.lines())

现在我们可以使用此代码将标题行与其他行分开。在 HTTP 中,标头是按行构造的。它由一条空线与正文分开。所以首先,让我们计算标题中的行数:

val demoHostName="scala-lang.org" // Hope they won't mind...
simpleHttpClient(demoHostName).zipWithIndex.takeWhile(! _._1.isEmpty).runLast.run
// res3: Option[(String, Int)] = Some((Content-Type: text/html,8))

当我运行这个时,标题中有 8 行。让我们首先定义一个枚举,因此对响应的各个部分进行分类:

object HttpResponsePart {
  sealed trait EnumVal
  case object HeaderLine extends EnumVal
  case object HeaderBodySeparator extends EnumVal
  case object Body extends EnumVal
  val httpResponseParts = Seq(HeaderLine, HeaderBodySeparator, Body)
}

然后让我们使用zipWithIndexplusmap对响应的各个部分进行分类:

simpleHttpClient(demoHostName).zipWithIndex.map{
  case (line, idx) if idx < 9 => (line, HeaderLine)
  case (line, idx) if idx == 10 => (line, HeaderBodySeparator)
  case (line, _) => (line, Body)
}.take(15).runLog.run

对我来说,这很好用。但当然,标题行的数量可以随时更改,恕不另行通知。使用考虑响应结构的非常简单的解析器会更加健壮。为此我使用zipWithState

simpleHttpClient(demoHostName).zipWithState(HeaderLine : EnumVal){
  case (line, HeaderLine) if line.isEmpty => HeaderBodySeparator
  case (_, HeaderLine) => HeaderLine
  case (_, HeaderBodySeparator) => Body
  case (line, Body) => Body
}.take(15).runLog.run

您可以看到,两种方法都使用相似的结构,并且两种方法都应该导致相同的结果。好的是,这两种方法都很容易重复使用。您可以只换掉源,例如用一个文件,而不必改变任何东西。与分类后的处理相同。这.take(15).runLog.run两种方法完全相同。

于 2015-03-04T11:15:16.023 回答