目前我们有一个数据流作业,它从 pubsub 读取并使用 FileIO.writeDynamic 将 avro 文件写入 GCS,当我们使用 10000 events/sec 进行测试时,由于 WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards 非常慢,因此无法更快地处理。下面是我们用来编写的代码片段。我们如何改进
PCollection<Event> windowedWrites = input.apply("Global Window", Window.<Event>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterFirst.of(AfterPane.elementCountAtLeast(50000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils
.parseDuration(windowDuration))))).discardingFiredPanes());
return windowedWrites
.apply("WriteToAvroGCS", FileIO.<EventDestination, Five9Event>writeDynamic()
.by(groupFn)
.via(outputFn, Contextful.fn(
new SinkFn()))
.withTempDirectory(avroTempDirectory)
.withDestinationCoder(destinationCoder)
.withNumShards(1).withNaming(namingFn));
我们使用自定义文件命名,格式为 gs://tenantID.<>/eventname/dddd-mm-dd/<uniq_id-shardInder-of-numOfShards-pane-paneIndex.avro>