1

我有一个类,它获取一个本地文件,对其进行转换并将其存储在 GCS 中:

import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }

import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._


class GcsService(gcsStorage: Storage) {

  def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
    val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build

    if (destination.unzipGzip) {
      for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
           output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
        ByteStreams.copy(input, output)
      }
    } else if (destination.decompressBzip2) {
      for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
           output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
        ByteStreams.copy(input, output)
      }
    } else {
      for (input <- managed(Files.newInputStream(localPath));
           output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
        IOUtils.copy(input, output)
      }
    }
  }

}

case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

我正在尝试删除一些代码重复,特别是创建fileInputStreamand gcsOutputStream。但我不能简单地在方法顶部提取这些变量,因为它会在 scala-armmanaged块之外创建资源:

import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }

import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._


class GcsService(gcsStorage: Storage) {

  def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
    val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build

    // FIXME: creates a resource outside of the ARM block
    val fileInputStream = Files.newInputStream(localPath)
    val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))

    if (destination.unzipGzip) {
      unzipGzip(fileInputStream, gcsOutputStream)
    } else if (destination.decompressBzip2) {
      decompressBzip2(fileInputStream, gcsOutputStream)
    } else {
      copy(fileInputStream, gcsOutputStream)
    }
  }

  private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input ← managed(new ZipInputStream(inputStream));
         output ← managed(new GZIPOutputStream(outputStream))) {
      ByteStreams.copy(input, output)
    }
  }

  private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input <- managed(new BZip2CompressorInputStream(inputStream));
         output <- managed(outputStream)) {
      ByteStreams.copy(input, output)
    }
  }

  private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input <- managed(inputStream);
         output <- managed(outputStream)) {
      IOUtils.copy(input, output)
    }
  }
}

case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

如您所见,代码更清晰,更可测试,但资源没有被正确处理,因为它们不是“托管”的。例如,如果在创建时抛出异常gcsOutputStreamfileInputStream则不会关闭。

我可能可以使用Google Guava sources and sinks来解决这个问题,但我想知道在 Scala 中是否有更好的方法来解决这个问题,而无需引入 Guava。理想情况下使用标准库或 scala-arm 功能,甚至可能在Cats?

  • 我应该将fileInputStreamand定义gcsOutputStream为不带任何内容并返回流的函数吗?似乎代码在任何地方都会更加() => InputStream冗长() => OutputStream
  • 我应该使用多个scala-arm“管理”来理解(一个定义fileInputStreamand gcsOutputStream,另一个在每个子函数内)?如果我这样做,“内部”输入流将被关闭两次不是问题吗?
  • 是否有一种我没有看到的干净和“scalaish”的方法来做到这一点?
4

1 回答 1

1

你可以像这样重构它:

首先,声明托管资源:

val fileInputStream: ManagedResource[InputStream] = managed(Files.newInputStream(localPath))
val gcsOutputStream: ManagedResource[OutputStream] = managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))

它不会打开这些资源,它只是声明您希望管理这些资源。

然后您可以使用map将它们包装在所需的装饰器中(如ZipInputStream):

if (destination.unzipGzip) {
  for (input ← fileInputStream.map(s => new ZipInputStream(s));
       output ← gcsOutputStream.map(s => new GZIPOutputStream(s))) {
    ByteStreams.copy(input, output)
  }
} else if (destination.decompressBzip2) {
  for (input <- fileInputStream.map(s => new BZip2CompressorInputStream(s));
       output <- gcsOutputStream) {
    ByteStreams.copy(input, output)
  }
} else {
  for (input <- fileInputStream;
       output <- gcsOutputStream) {
    IOUtils.copy(input, output)
  }
}

当然ManagedResource[A]只是值,所以你甚至可以将它作为参数传递给方法:

private def unzipGzip(inputStream: Managed[InputStream], outputStream: Managed[OutputStream]): Unit = {
  for (input ← inputStream.map(s => new ZipInputStream(s));
       output ← outputStream.map(s => new GZIPOutputStream(s))) {
    ByteStreams.copy(input, output)
  }
}
于 2020-01-23T08:22:42.407 回答