3

有人知道这个问题的解决方案吗?我将 try catch finally 构造重写为一种功能性的做事方式,但我现在无法关闭流:-)

import scala.util.control.Exception._
def gunzip() = {
  logger.info(s"Gunziping file ${f.getAbsolutePath}")
  catching(classOf[IOException], classOf[FileNotFoundException]).
    andFinally(println("how can I close the stream ?")).
    either ({
        val is = new GZIPInputStream(new FileInputStream(f))
        Stream.continually(is.read()).takeWhile(-1 !=).map(_.toByte).toArray
      }) match {
          case Left(e) =>
            val msg = s"IO error reading file ${f.getAbsolutePath} ! on host ${Setup.smtpHost}"
            logger.error(msg, e)
            MailClient.send(msg, msg)
            new Array[Byte](0)
          case Right(v) => v
        }
  }

我根据 Senia 的解决方案重写了它,如下所示:

def gunzip() = {
  logger.info(s"Gunziping file ${file.getAbsolutePath}")

  def closeAfterReading(c: InputStream)(f: InputStream => Array[Byte]) = {
    catching(classOf[IOException], classOf[FileNotFoundException])
      .andFinally(c.close())
      .either(f(c)) match {
      case Left(e) => {
        val msg = s"IO error reading file ${file.getAbsolutePath} ! on host ${Setup.smtpHost}"
        logger.error(msg, e)
        new Array[Byte](0)
      }
      case Right(v) => v
    }
  }

  closeAfterReading(new GZIPInputStream(new FileInputStream(file))) { is =>
    Stream.continually(is.read()).takeWhile(-1 !=).map(_.toByte).toArray
  }
}
4

4 回答 4

5

对于这种情况,我更喜欢这种结构:

def withCloseable[T <: Closeable, R](t: T)(f: T => R): R = {
  allCatch.andFinally{t.close} apply { f(t) }
}

def read(f: File) =
  withCloseable(new GZIPInputStream(new FileInputStream(f))) { is =>
    Stream.continually(is.read()).takeWhile(-1 !=).map(_.toByte).toArray
  }

现在你可以用一些异常来包装它Try并恢复:

val result =
  Try { read(f) }.recover{
    case e: IOException => recover(e) // logging, default value
    case e: FileNotFoundException => recover(e)
  }
val array = result.get // Exception here!
于 2013-06-21T11:32:38.520 回答
2
  1. 采取“scala-arm”
  2. 采取Apache “commons-io”

然后执行以下操作

val result = 
  for {fis <- resource.managed(new FileInputStream(f))
       gis <- resource.managed(new GZIPInputStream(fis))}
  yield IOUtils.toString(gis, "UTF-8")

result.acquireFor(identity) fold (reportExceptions _, v => v)
于 2013-06-21T12:24:34.053 回答
2

如何处理它的一种方法是使用一个可变的列表,这些列表已经打开并且需要稍后关闭:

val cs: Buffer[Closeable] = new ArrayBuffer();
def addClose[C <: Closeable](c: C) = { cs += c; c; }

catching(classOf[IOException], classOf[FileNotFoundException]).
  andFinally({ cs.foreach(_.close()) }).
  either ({
      val is = addClose(new GZIPInputStream(new FileInputStream(f)))
      Stream.continually(is.read()).takeWhile(-1 !=).map(_.toByte).toArray
    }) // ...

更新:您可以为此目的使用scala-conduit库(我是作者)。(该库目前还没有准备好生产。)管道(AKA conduids)的主要目的是构建具有良好定义的资源处理的可组合组件。每个管道重复接收输入并产生输入。可选地,它还可以在完成时产生最终结果。Pips 具有在管道完成后运行的终结器 - 无论是单独运行还是在其下游管道完成时运行。您的示例可以修改(使用 Java NIO),如下所示:

/**
 * Filters buffers until a given character is found. The last buffer
 * (truncated up to the character) is also included.
 */
def untilPipe(c: Byte): Pipe[ByteBuffer,ByteBuffer,Unit] = ...

// Create a new source that chunks a file as ByteBuffer's.
// (Note that the buffer changes on every step.)
val source: Source[ByteBuffer,Unit] = ...

// Sink that prints bytes to the standard output.
// You would create your own sink doing whatever you want.
val sink: Sink[ByteBuffer,Unit]
  = NIO.writeChannel(Channels.newChannel(System.out));

runPipe(source >-> untilPipe(-1) >-> sink);

一旦untilPipe(-1)找到-1并完成,它的上游source管道的终结器就会运行并关闭输入。如果管道中的任何地方发生异常,输入也会关闭。

完整的例子可以在这里找到。

于 2013-06-21T12:27:39.270 回答
0

对于像 java.io.Socket 这样的 closeble 对象可能运行很长时间的情况,我还有另一个建议,因此必须将其包装在 Future 中。当您还控制超时时,这是少数,当 Socket 没有响应时。

object CloseableFuture {
  type Closeable = {
    def close(): Unit
  }

  private def withClose[T, F1 <: Closeable](f: => F1, andThen: F1 => Future[T]): Future[T] = future(f).flatMap(closeable => {
    val internal = andThen(closeable)
    internal.onComplete(_ => closeable.close())
    internal
  })

  def apply[T, F1 <: Closeable](f: => F1, andThen: F1 => T): Future[T] =
    withClose(f, {c: F1 => future(andThen(c))})

  def apply[T, F1 <: Closeable, F2 <: Closeable](f1: => F1, thenF2: F1 => F2, andThen: (F1,F2) => T): Future [T] =
    withClose(f1, {c1:F1 => CloseableFuture(thenF2(c1), {c2:F2 => andThen(c1,c2)})})
}

在我为它打开一个 java.io.Socket 和 java.io.InputStream,然后执行从 WhoisServer 读取的代码后,我最终将它们都关闭了。完整代码:

CloseableFuture(
      {new Socket(server.address, WhoisPort)},
      (s: Socket) => s.getInputStream,
      (socket: Socket, inputStream: InputStream) => {
        val streamReader = new InputStreamReader(inputStream)
        val bufferReader = new BufferedReader(streamReader)
        val outputStream = socket.getOutputStream
        val writer = new OutputStreamWriter(outputStream)
        val bufferWriter = new BufferedWriter(writer)
        bufferWriter.write(urlToAsk+System.getProperty("line.separator"))
        bufferWriter.flush()
        def readBuffer(acc: List[String]): List[String] = bufferReader.readLine() match {
          case null => acc
          case str => {
            readBuffer(str :: acc)
          }
        }
        val result = readBuffer(Nil).reverse.mkString("\r\n")
        WhoisResult(urlToAsk, result)
      }
    )
于 2014-03-28T09:43:16.687 回答