1

我有一个简单的管道,可以从固定窗口中的 pubsub 读取,解析消息并按特定属性对它们进行分组。但是,如果我mapgroupBy我的函数之后似乎没有被执行。

我错过了什么吗?

sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
  .withFixedWindow(Duration.standardSeconds(windowSeconds))
  .map(parseMessage)
  .groupBy(_.ip_address)
  .map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))
4

1 回答 1

2

我能够使用DirectRunner从 Pub/Sub 读取的简单管道重现问题,使用消息的第一个单词作为键,应用GroupByKey然后记录条目。似乎 GBK 步骤等待所有数据到达,并且由于它是无限源,因此不会发出任何结果。对我有用的是定义一个带有触发的窗口策略,例如:

object PubSubTest {
  private lazy val log = LoggerFactory.getLogger(this.getClass)

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    val defaultInputSub = "test_sub"
    val subscription = args.getOrElse("input", defaultInputSub)
    val project = "PROJECT_ID"

    sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
      // provide window options including triggering
      .withFixedWindows(duration = Duration.standardSeconds(10), options = WindowOptions(
        trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
          .plusDelayOf(Duration.standardSeconds(2))),
        accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
        closingBehavior = ClosingBehavior.FIRE_IF_NON_EMPTY,
        allowedLateness = Duration.standardSeconds(0))
      )
      // use first word of the Pub/Sub message as the key
      .keyBy(a => a.split(" ")(0))
      .groupByKey
      .map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))

    val result = sc.close().waitUntilFinish()
  }
}
于 2018-09-16T20:44:59.120 回答