我正在使用 Apache Beam 2.6 读取单个 Kafka 主题并将输出写入 Google Cloud Storage (GCS)。现在我想改变管道,以便它读取多个主题并将它们写成gs://bucket/topic/...
TextIO
仅阅读我在管道的最后一步中使用的单个主题时:
TextIO.write()
.to(
new DateNamedFiles(
String.format("gs://bucket/data%s/", suffix), currentMillisString))
.withWindowedWrites()
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1));
这是一个类似的问题,我试图调整哪些代码。
FileIO.<EventType, Event>writeDynamic()
.by(
new SerializableFunction<Event, EventType>() {
@Override
public EventType apply(Event input) {
return EventType.TRANSFER; // should return real type here, just a dummy
}
})
.via(
Contextful.fn(
new SerializableFunction<Event, String>() {
@Override
public String apply(Event input) {
return "Dummy"; // should return the Event converted to a String
}
}),
TextIO.sink())
.to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
currentMillisString),
new SerializableFunction<String, String>() {
@Override
public String apply(String input) {
return null; // Not sure what this should exactly, but it needs to
// include the EventType into the path
}
}))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1))
官方 JavaDoc包含的示例代码似乎具有过时的方法签名。(该.via
方法似乎改变了参数的顺序)。我还偶然发现了一个让我感到困惑的例子——FileIO
不应该在这条线上改变位置吗? TransactionType
Transaction