我在 cloud-dataflow 中有一个流式传输管道,我在其中设置Metrics.counter
如下。
class SomeDoFn extends DoFn {
val validIdCounter = Metrics.counter("user-type", "valid_ids")
val invalidIdCounter = Metrics.counter("user-type", "invalid_ids")
@ProcessElement
def process(c: ProcessContext): Unit = {
val userId = getId(c.element) match {
case Success(id) =>
validIdCounter.inc()
Some(id)
case Failure(e) =>
invalidIdCounter.inc()
None
}
...
}
我能够在 Stackdriver 监控中查看指标并为其创建警报。但是当我重新启动管道时,指标变为零。这是预期的行为吗?有没有办法可以跨作业和作业运行保留指标?