这是我经过几个小时的实验后得到的。我希望有人能想出一个更优雅的实现,因为我几乎无法理解我的。
def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] {
def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = {
def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]):
Iteratee[AB, Iteratee[AB, A]] = {
e match {
case Input.EOF =>
// if we have a leftover and it's a chunk, then output it
leftover match {
case Input.EOF | Input.Empty => Done(in, leftover)
case Input.El(_) =>
val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover)
>>> Enumerator.eof |>> chunker)
lastChunk.pureFlatFold(
done = { (chunk, rest) =>
val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in)
nextIn.pureFlatFold(
done = (a, e2) => Done(nextIn, e2),
// nothing more will come
cont = k => Done(nextIn, Input.EOF),
error = (msg, e2) => Error(msg, e2))
},
// not enough content to get a chunk, so drop content
cont = k => Done(in, Input.EOF),
error = (msg, e2) => Error(msg, e2))
}
case Input.Empty => Cont(step(_, in, leftover))
case Input.El(arr) =>
// feed through chunker
val iChunks = Iteratee.flatten(
Enumerator.enumInput(leftover)
>>> Enumerator(arr)
>>> Enumerator.eof // to extract the leftover
|>> repeat(chunker))
iChunks.pureFlatFold(
done = { (chunks, rest) =>
// we have our chunks, feed them to the inner iteratee
val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in)
nextIn.pureFlatFold(
done = (a, e2) => Done(nextIn, e2),
// inner iteratee needs more data
cont = k => Cont(step(_: Input[AB], nextIn,
// we have to ignore the EOF we fed to repeat
if (rest == Input.EOF) Input.Empty else rest)),
error = (msg, e2) => Error(msg, e2))
},
// not enough content to get a chunk, continue
cont = k => Cont(step(_: Input[AB], in, leftover)),
error = (msg, e2) => Error(msg, e2))
}
}
Cont(step(_, inner, Input.Empty))
}
}
这是我自定义的定义repeat
:
// withhold the last chunk so that it may be concatenated with the next one
def repeat(chunker: Iteratee[AB, AB]) = {
def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB],
leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match {
case Input.EOF => ch.pureFlatFold(
done = (a, e) => Done(acc, leftover),
cont = k => k(Input.EOF).pureFlatFold(
done = (a, e) => Done(acc, Input.El(a)),
cont = k => sys.error("divergent iter"),
error = (msg, e) => Error(msg, e)),
error = (msg, e) => Error(msg, e))
case Input.Empty => Cont(loop(_, ch, acc, leftover))
case Input.El(_) =>
val i = Iteratee.flatten(Enumerator.enumInput(leftover)
>>> Enumerator.enumInput(e) |>> ch)
i.pureFlatFold(
done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty),
cont = k => Cont(loop(_, i, acc, Input.Empty)),
error = (msg, e) => Error(msg, e))
}
Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty))
}
这适用于一些样本,包括这个:
val source = Enumerator(
"bippy".getBytes,
"foo\n\rbar\n\r\n\rbaz\nb".getBytes,
"azam\ntoto\n\n".getBytes)
Ok.stream(source
&> chunkBy(line)
&> Enumeratee.map(l => l ++ ".\n".getBytes)
).as("text/plain")
哪个打印:
bippyfoo.
bar.
baz.
bazam.
toto.