令人惊讶的是,我在 Iteratees 和错误处理方面遇到了一些问题。
问题;
从一个(来自网络,必须是 InputStream)中读取一些字节,InputStream
在这个 InputStream 上做一些分块/grouing(用于工作分配),然后将其转换为case class DataBlock(blockNum: Int, data: ByteString)
用于发送给参与者的转换(Array[Bytes] 转换为紧凑字节字符串)。
流量;
InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors
编码;
class IterateeTest {
val actor: ActorRef = myDataBlockRxActor(...)
val is = new IntputStream(fromBytes...)
val source = Enumerator.fromStream(is)
val chunker = Traversable.takeUpTo[Array[Byte]](1000)
val transform:Iteratee[Array[Byte], Int] = Iteratee.fold[Array[Byte],Int](0) {
(bNum, bytes) => DataBlock(bNum, CompactByteString(bytes)); bNum + 1
}
val fut = source &> chunker |>> transform
}
case class DataBlock(blockNum: Int, data: CompactByteString)
问题;
我当前的 Iteratee 代码运行良好。但是,我希望能够处理任何一方的故障;
- 当 InputStream
read
方法失败时 - 我想知道有多少字节/块已成功处理并从该点继续读取流。当在枚举器中读取时抛出错误,fut
只是返回异常,没有状态,所以我不知道我在做什么,除非我将它传递给 rxing 演员(我不想这样做) - 如果输出端失败或无法再接收 DataBlock 消息,因为 Actor 的缓冲区已满,保持从输入流读取
我该怎么做?
因为我需要定义的错误处理,我怎么能/我会更好地使用反应流/Akka-stream(实验性)或scalaz迭代来尝试这个?