使用 scio 0.4.7 版,我有一个正在监听 pubsub 主题的流式作业,我在这里使用事件处理,RFC3339 中的消息属性上存在“时间戳”属性
val rtEvents: SCollection[RTEvent] = sc.pubsubTopic(args("topic"), timestampAttribute = "timestamp").map(jsonToObject)
val windowedEvents = rtEvents.withFixedWindows(Duration.standardMinutes(1L),
options = WindowOptions(trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
accumulationMode = DISCARDING_FIRED_PANES,
allowedLateness = Duration.standardSeconds(1L)
)
)
我使用 windowedEvents 在管道中进行进一步的聚合和计算
doSomeAggregation(windowedEvents)
def doSomeAggregation(events: SCollection[RTEvent]): SCollection[(String, Map[String, Int])] =
events.map(e => (e.properties.key, (e.properties.category, e.id)))
.groupByKey
.map { case (key, tuple: Iterable[(String, String)]) =>
val countPerCategory: Map[String, Int] = tuple.groupBy(_._1)
.mapValues(_.toList.distinct.size)
//some other http post and logging here
(key, countPerCategory)
}
sc.close().waitUntilFinish()
如果我在谷歌数据流上使用以下自动缩放参数运行作业
--workerMachineType=n1-standard-8 --autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=4
如果只有一名工作人员在运行,则作业运行并且固定窗口正确触发。一旦作业自动扩展到超过 1 个工作人员,固定窗口就会停止触发,初始 pubsub 步骤的系统延迟和挂墙时间会持续增长,而数据水印不会向前移动。
我的触发器设置有问题吗?有没有其他人在数据流运行器或其他运行器上遇到过这种情况?任何帮助是极大的赞赏。如果我不能解决这个问题,我倾向于放弃 scio 并恢复到 apache-beam java sdk。