我正在尝试编写一个枚举器,用于从java.io.BufferedReader
使用Scalaz 7 的 iteratee 库中逐行读取文件,该库目前只为java.io.Reader
.
我遇到的问题与我使用过的所有其他迭代库(例如Play 2.0和John Millikin 的enumerator
Step
Haskell)作为其类型的构造函数之一和 Scalaz 7都存在错误状态有关。没有。
我目前的实现
这是我目前拥有的。首先是一些导入和IO
包装器:
import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I, _ }
def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
def readLine(r: BufferedReader) = IO(Option(r.readLine))
def closeReader(r: BufferedReader) = IO(r.close())
还有一个类型别名来清理一下:
type ErrorOr[A] = Either[Throwable, A]
现在是一个tryIO
助手,模仿(松散地,可能是错误地)在enumerator
:
def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, ErrorOr[B]](
action.catchLeft.map(
r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
)
)
自身的枚举器BufferedReader
:
def enumBuffered(r: => BufferedReader) = new EnumeratorT[ErrorOr[String], IO] {
lazy val reader = r
def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
tryIO(readLine(reader)) flatMap {
case Right(None) => s.pointI
case Right(Some(line)) => k(I.elInput(Right(line))) >>== apply[A]
case Left(e) => k(I.elInput(Left(e)))
}
)
}
最后是一个负责打开和关闭阅读器的枚举器:
def enumFile(f: File) = new EnumeratorT[ErrorOr[String], IO] {
def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
tryIO(openFile(f)) flatMap {
case Right(reader) => I.iterateeT(
enumBuffered(reader).apply(s).value.ensuring(closeReader(reader))
)
case Left(e) => k(I.elInput(Left(e)))
}
)
}
现在假设我想将文件中包含至少 25 个'0'
字符的所有行收集到一个列表中。我可以写:
val action: IO[ErrorOr[List[String]]] = (
I.consume[ErrorOr[String], IO, List] %=
I.filter(_.fold(_ => true, _.count(_ == '0') >= 25)) &=
enumFile(new File("big.txt"))
).run.map(_.sequence)
在许多方面,这似乎工作得很好:我可以开始这个动作unsafePerformIO
,它会在几分钟内通过数千万行和千兆字节的数据,在恒定的内存中并且不会破坏堆栈,然后关闭阅读器完成后。如果我给它一个不存在的文件的名称,它会尽职尽责地将包含在 a 中的异常返回给我Left
,并且enumBuffered
如果它在读取时遇到异常,至少似乎表现得适当。
潜在问题
不过,我对我的实现有些担心——尤其是tryIO
. 例如,假设我尝试编写一些迭代:
val it = for {
_ <- tryIO[Unit, Unit](IO(println("a")))
_ <- tryIO[Unit, Unit](IO(throw new Exception("!")))
r <- tryIO[Unit, Unit](IO(println("b")))
} yield r
如果我运行它,我会得到以下信息:
scala> it.run.unsafePerformIO()
a
b
res11: ErrorOr[Unit] = Right(())
如果我在 GHCi 中尝试同样的事情enumerator
,结果更像我所期望的:
...> run $ tryIO (putStrLn "a") >> tryIO (error "!") >> tryIO (putStrLn "b")
a
Left !
我只是看不到在 iteratee 库本身没有错误状态的情况下获得这种行为的方法。
我的问题
我并不声称自己是迭代器方面的专家,但我在一些项目中使用了各种 Haskell 实现,感觉我或多或少地理解了基本概念,并且曾经和 Oleg 喝过咖啡。不过,我在这里不知所措。这是在没有错误状态的情况下处理异常的合理方法吗?有没有一种方法可以实现tryIO
更像enumerator
版本?由于我的实现行为不同,是否有某种定时炸弹在等着我?