4

使用 SCIO fromspotify为 编写作业Dataflow,遵循 2 个示例e.g1e.g2PubSub将流写入到GCS,但以下代码出现以下错误

错误

Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection 

代码

object StreamingPubSub {
  def main(cmdlineArgs: Array[String]): Unit = {
// set up example wiring
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs)
val dataflowUtils = new DataflowExampleUtils(opts)
dataflowUtils.setup()

val sc = ScioContext(opts)


sc.pubsubTopic(opts.getPubsubTopic)
.timestampBy {
    _ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong)
  }
.withFixedWindows((Duration.standardHours(1)))
.groupBy(_ => Unit)
.toWindowed
.toSCollection
.saveAsTextFile(args("output"))


val result = sc.close()

// CTRL-C to cancel the streaming pipeline
    dataflowUtils.waitToFinish(result.internal)
  }
}

我可能将窗口概念与 Bounded PCollection 混合在一起,有没有办法实现这一点,或者我需要应用一些变换来实现这一点,任何人都可以在这方面提供帮助

4

1 回答 1

3

我相信 SCIO 的底层saveAsTextFile使用了 Dataflow 的Write转换,它只支持有界 PCollections。Dataflow 尚未提供直接 API 来将无限 PCollection 写入 Google Cloud Storage,尽管这是我们正在调查的内容。

要在某处持久保存无界 PCollection,请考虑使用 BigQuery、Datastore 或 Bigtable。例如,在 SCIO 的 API 中,您可以使用saveAsBigQuery.

于 2016-10-05T17:14:43.850 回答