我想使用 Apache Beam 中的流管道(并在 Google DataFlow 上运行)来实现以下场景:
- 从 Pub/Sub 读取消息(JSON 字符串)
- 反序列化 JSON
- 使用自定义字段(比如
timeStamp
)作为处理元素的时间戳值 - 应用固定窗口
60 seconds
- 从元素中提取密钥并按密钥分组
- <<进行进一步处理>>
我尝试使用 Java(Scala) 和 Python 解决这个问题,但没有一个解决方案有效。
- Python解决方案
# p is beam.Pipeline()
_ = (p | beam.io.ReadFromPubSub(subscription="my_sub")
| beam.Map(add_timestamping)
| beam.WindowInto(window.FixedWindows(60))
| beam.Map(lambda elem: elem) # exracting the key somehow, not relevant here
| beam.GroupByKey()
# (...)
| beam.io.WriteToPubSub("output_topic")
)
p.run()
add_timestamping
根据文档功能:
def add_timestamping(elem):
import json
import apache_beam as beam
msg = json.loads(elem)
unix_timestamp = msg['timeStamp'] / 1000
return beam.window.TimestampedValue(msg, unix_timestamp)
Python解决方案的输出:
- 使用时
DirectRunner
,会发出窗口,并且窗口本身或多或少是合适的,具体取决于延迟。 - 使用时
DataFlowRunner
,会跳过所有消息,并在 DataFlow UI 中显示计数器:dropDueToLateness。
- Java / Scala 解决方案 (我使用过Scio,但这也发生在 Java 中的干净 Beam SDK 中)
sc.pubsubSubscription[String]("my_sub")
.applyTransform(ParDo.of(new CustomTs()))
.withFixedWindows(Duration.standardSeconds(60))
.map(x => x) // exracting the key somehow, not relevant here
.groupByKey
// (...)
.saveAsPubsub("output_topic")
根据文档添加自定义时间戳:
import io.circe.parser._
class CustomTs extends DoFn[String, String] {
@ProcessElement
def processElement(@Element element: String, out: DoFn.OutputReceiver[String]): Unit = {
val json = parse(element).right.get
val timestampMillis: Long = json.hcursor.downField("timeStamp").as[Long].getOrElse(0)
out.outputWithTimestamp(element, new Instant(timestampMillis))
}
}
Java / Scala 解决方案的输出:
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalArgumentException:
Cannot output with timestamp 2019-03-02T00:51:39.124Z.
Output timestamps must be no earlier than the timestamp of the current input
(2019-03-28T14:57:53.195Z) minus the allowed skew (0 milliseconds).
我不能DoFn.getAllowedTimestampSkew
在这里使用,因为它已经被弃用了,而且我不知道将发送什么范围的历史数据。
处理历史数据的能力对我的项目至关重要(这些数据将从某个商店发送到 Pub/Sub)。管道必须同时处理当前数据和历史数据。
我的问题是: 如何使用自定义时间戳处理数据,并能够在使用 Beam API 定义的窗口上进行操作?