3

我有一个简单的工作,将数据从 pub sub 移动到 gcs。发布子主题是一个共享主题,具有许多不同大小的不同消息类型

我希望结果相应地在 GCS 垂直分区中:

架构/版本/年/月/日/

在该父键下应该是当天的一组文件,并且文件应该是合理的大小,即 10-200 mb

我正在使用 scio,并且我能够进行 groupby 操作来创建 [String, Iterable[Event]] 的 P/SCollection,其中键基于上述分区方案。

我无法使用默认文本接收器,因为它们不支持垂直分区,它只能将整个 pcollection 写入一个位置。而是遵循以下答案中的建议:

如何在 Apache Beam 中写入多个文件?

使用 DoFn 使用 Cloud Dataflow 从 PubSub 写入 Google Cloud Storage

我创建了一个简单的函数,将我的组写入 gcs。

object GcsWriter {

  private val gcs: storage.Storage = StorageOptions.getDefaultInstance.getService

  val EXTENSION = ".jsonl.gz"

  //todo no idea if this is ok. see org.apache.beam.sdk.io.WriteFiles is a ptransform that writes text files and seems very complex
  //maybe beam is aimed at a different use case
  //this is an output 'transform' that writes text files
  //org.apache.beam.sdk.io.TextIO.write().to("output")


  def gzip(bytes: Array[Byte]): Array[Byte] = {
    val byteOutputStream = new ByteArrayOutputStream()
    val compressedStream = new GZIPOutputStream(byteOutputStream)
    compressedStream.write(bytes)
    compressedStream.close()
    byteOutputStream.toByteArray
  }

  def writeAsTextToGcs(bucketName: String, key: String, items: Iterable[String]): Unit = {
    val bytes = items.mkString(start = "",sep ="\n" ,end = "\n").getBytes("UTF-8")
    val compressed = gzip(bytes)
    val blobInfo = BlobInfo.newBuilder(bucketName, key + System.currentTimeMillis() + EXTENSION).build()
    gcs.create(blobInfo, compressed)
  }

}

这可以工作并以我喜欢的方式写入文件,我使用以下带有固定窗口的触发规则:

val WINDOW_DURATION: Duration = Duration.standardMinutes(10)
  val WINDOW_ELEMENT_MAX_COUNT = 5000
  val LATE_FIRING_DELAY: Duration = Duration.standardMinutes(10) //this is the time after receiving late data to refiring
  val ALLOWED_LATENESS: Duration = Duration.standardHours(1)


  val WINDOW_OPTIONS = WindowOptions(
    trigger = AfterFirst.of(
      ListBuffer(
        AfterPane.elementCountAtLeast(WINDOW_ELEMENT_MAX_COUNT),
        AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(LATE_FIRING_DELAY)))),
    allowedLateness = ALLOWED_LATENESS,
    accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
  )

基本上是在窗口结束时根据水印或收到 x 个元素时的复合触发。

问题在于源数据可能包含大小不一的消息。因此,如果我选择要触发的固定数量的元素,我将:

1)选择一个太大的数字,对于较大的事件组,它会炸毁工人上的 Java 堆 2)选择一个较小的数字,然后我最终会得到一些小文件用于安静的事件,我想在其中积累更多的事件在我的档案中。

我没有看到一个自定义触发器,我可以在其中传递一个 lambda,它输出每个元素的度量或类似的东西。有没有办法我可以实现自己的触发器来触发窗口中的字节数。

其他一些问题

我是否正确假设每个组中元素的迭代器在内存中而不是从存储中流式传输?如果不是,我可以以更节省内存的方式从迭代器流式传输到 gcs

对于我的 GCS 作家,我只是在地图或 ParDo 中进行操作。它没有实现文件输出接收器或看起来像 TextIo 的任何东西。这个简单的实现会不会有问题。在文档中它说,如果转换引发异常,它会被简单地重试(对于流媒体应用程序来说是不确定的)

4

0 回答 0