0

使用 scio 0.4.7 版,我有一个正在监听 pubsub 主题的流式作业,我在这里使用事件处理,RFC3339 中的消息属性上存在“时间戳”属性

val rtEvents: SCollection[RTEvent] = sc.pubsubTopic(args("topic"), timestampAttribute = "timestamp").map(jsonToObject)
val windowedEvents = rtEvents.withFixedWindows(Duration.standardMinutes(1L),
  options = WindowOptions(trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
    accumulationMode = DISCARDING_FIRED_PANES,
    allowedLateness = Duration.standardSeconds(1L)
  )
)

我使用 windowedEvents 在管道中进行进一步的聚合和计算

doSomeAggregation(windowedEvents)

def doSomeAggregation(events: SCollection[RTEvent]): SCollection[(String, Map[String, Int])] =
        events.map(e => (e.properties.key, (e.properties.category, e.id)))
          .groupByKey
          .map { case (key, tuple: Iterable[(String, String)]) =>
            val countPerCategory: Map[String, Int] = tuple.groupBy(_._1)
              .mapValues(_.toList.distinct.size)
            //some other http post and logging here
            (key, countPerCategory)
          }

    sc.close().waitUntilFinish()

如果我在谷歌数据流上使用以下自动缩放参数运行作业

--workerMachineType=n1-standard-8 --autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=4

如果只有一名工作人员在运行,则作业运行并且固定窗口正确触发。一旦作业自动扩展到超过 1 个工作人员,固定窗口就会停止触发,初始 pubsub 步骤的系统延迟和挂墙时间会持续增长,而数据水印不会向前移动。

我的触发器设置有问题吗?有没有其他人在数据流运行器或其他运行器上遇到过这种情况?任何帮助是极大的赞赏。如果我不能解决这个问题,我倾向于放弃 scio 并恢复到 apache-beam java sdk。

4

1 回答 1

2

我设法解决了这个问题。在我目前的设置中,工作人员无法相互通信。作业无声无息地失败,没有任何超时错误(光束可能会作为错误传播)。

如果您使用数据流作为运行器,请确保为项目中的数据流定义的防火墙是为“默认”网络定义的。

如果为您的网络定义了数据流防火墙,您需要将额外的运行时参数传递到您的作业中

--workerMachineType=n1-standard-8 --autoscalingAlgorithm=THROUGHPUT_BASED --maxNumWorkers=4 --network='your-network'

于 2018-02-28T01:26:01.053 回答