2

令人惊讶的是,我在 Iteratees 和错误处理方面遇到了一些问题。

问题;

从一个(来自网络,必须是 InputStream)中读取一些字节,InputStream在这个 InputStream 上做一些分块/grouing(用于工作分配),然后将其转换为case class DataBlock(blockNum: Int, data: ByteString)用于发送给参与者的转换(Array[Bytes] 转换为紧凑字节字符串)。

流量;

InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors

编码;

class IterateeTest {
  val actor: ActorRef = myDataBlockRxActor(...)
  val is = new IntputStream(fromBytes...)
  val source = Enumerator.fromStream(is)
  val chunker = Traversable.takeUpTo[Array[Byte]](1000)
  val transform:Iteratee[Array[Byte], Int] = Iteratee.fold[Array[Byte],Int](0) {
    (bNum, bytes) => DataBlock(bNum, CompactByteString(bytes)); bNum + 1
  }
  val fut = source &> chunker |>> transform
}

case class DataBlock(blockNum: Int, data: CompactByteString)

问题;

我当前的 Iteratee 代码运行良好。但是,我希望能够处理任何一方的故障;

  1. 当 InputStreamread方法失败时 - 我想知道有多少字节/块已成功处理并从该点继续读取流。当在枚举器中读取时抛出错误,fut只是返回异常,没有状态,所以我不知道我在做什么,除非我将它传递给 rxing 演员(我不想这样做)
  2. 如果输出端失败或无法再接收 DataBlock 消息,因为 Actor 的缓冲区已满,保持从输入流读取

我该怎么做?

因为我需要定义的错误处理,我怎么能/我会更好地使用反应流/Akka-stream(实验性)或scalaz迭代来尝试这个?

4

1 回答 1

2

(1) 可以用Enumeratee.

val (counter, getCount) = {
  var count = 0
  (Enumeratee.map { x => count += 1; x },
   () => count)
}
val fut = source &> counter &> chunker |>> transform

然后,您可以getCount在 a recoveror such on内使用fut

您可以通过 Play Iteratees 免费获得 (2) 个。在准备好接收更多数据之前,不会发生进一步的读取Iteratee,如果失败,则不会发生更多的读取。完成后会InputStream自动关闭Enumerator,无论是失败还是正常终止。

于 2014-06-16T21:53:08.730 回答