我能够使用DirectRunner
从 Pub/Sub 读取的简单管道重现问题,使用消息的第一个单词作为键,应用GroupByKey
然后记录条目。似乎 GBK 步骤等待所有数据到达,并且由于它是无限源,因此不会发出任何结果。对我有用的是定义一个带有触发的窗口策略,例如:
object PubSubTest {
private lazy val log = LoggerFactory.getLogger(this.getClass)
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val defaultInputSub = "test_sub"
val subscription = args.getOrElse("input", defaultInputSub)
val project = "PROJECT_ID"
sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
// provide window options including triggering
.withFixedWindows(duration = Duration.standardSeconds(10), options = WindowOptions(
trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2))),
accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
closingBehavior = ClosingBehavior.FIRE_IF_NON_EMPTY,
allowedLateness = Duration.standardSeconds(0))
)
// use first word of the Pub/Sub message as the key
.keyBy(a => a.split(" ")(0))
.groupByKey
.map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))
val result = sc.close().waitUntilFinish()
}
}