0

我正在使用 Apache Beam 2.6 读取单个 Kafka 主题并将输出写入 Google Cloud Storage (GCS)。现在我想改变管道,以便它读取多个主题并将它们写成gs://bucket/topic/...

TextIO仅阅读我在管道的最后一步中使用的单个主题时:

TextIO.write()
    .to(
        new DateNamedFiles(
            String.format("gs://bucket/data%s/", suffix), currentMillisString))
    .withWindowedWrites()
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1));

是一个类似的问题,我试图调整哪些代码。

FileIO.<EventType, Event>writeDynamic()
    .by(
        new SerializableFunction<Event, EventType>() {
          @Override
          public EventType apply(Event input) {
            return EventType.TRANSFER; // should return real type here, just a dummy
          }
        })
    .via(
        Contextful.fn(
            new SerializableFunction<Event, String>() {
              @Override
              public String apply(Event input) {
                return "Dummy"; // should return the Event converted to a String
              }
            }),
        TextIO.sink())
    .to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
                                                            currentMillisString),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String input) {
            return null; // Not sure what this should exactly, but it needs to 
                         // include the EventType into the path
          }
        }))
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1))

官方 JavaDoc包含的示例代码似乎具有过时的方法签名。(该.via方法似乎改变了参数的顺序)。我还偶然发现了一个让我感到困惑的例子——FileIO不应该在这条线上改变位置吗? TransactionTypeTransaction

4

1 回答 1

13

经过一夜的睡眠和重新开始后,我找到了解决方案,我使用了函数式 Java 8 样式,因为它使代码更短(并且更具可读性):

  .apply(
    FileIO.<String, Event>writeDynamic()
        .by((SerializableFunction<Event, String>) input -> input.getTopic())
        .via(
            Contextful.fn(
                (SerializableFunction<Event, String>) input -> input.getPayload()),
            TextIO.sink())
        .to(String.format("gs://bucket/data%s/", suffix)
        .withNaming(type -> FileNaming.getNaming(type, "", currentMillisString))
        .withDestinationCoder(StringUtf8Coder.of())
        .withTempDirectory(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString))
        .withNumShards(1));

解释:

  • Event是一个 Java POJO,包含 Kafka 消息的有效负载及其所属的主题,它在步骤ParDo之后的a 中解析KafkaIO
  • suffix是一个或者为dev空并由环境变量设置
  • currentMillisString包含启动整个管道时的时间戳,以便在管道重新启动时新文件不会覆盖 GCS 上的旧文件
  • FileNaming实现自定义命名并在其构造函数中接收事件(主题)的类型,它使用自定义格式化程序写入 GCS 上的每日分区“子文件夹”:

    class FileNaming implements FileIO.Write.FileNaming {
      static FileNaming getNaming(String topic, String suffix, String currentMillisString) {
        return new FileNaming(topic, suffix, currentMillisString);
      }
    
      private static final DateTimeFormatter FORMATTER = DateTimeFormat
          .forPattern("yyyy-MM-dd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("Europe/Zurich")));
    
      private final String topic;
      private final String suffix;
      private final String currentMillisString;
    
      private String filenamePrefixForWindow(IntervalWindow window) {
        return String.format(
            "%s/%s/%s_", topic, FORMATTER.print(window.start()), currentMillisString);
      }
    
      private FileNaming(String topic, String suffix, String currentMillisString) {
        this.topic = topic;
        this.suffix = suffix;
        this.currentMillisString = currentMillisString;
      }
    
      @Override
      public String getFilename(
          BoundedWindow window,
          PaneInfo pane,
          int numShards,
          int shardIndex,
          Compression compression) {
    
        IntervalWindow intervalWindow = (IntervalWindow) window;
        String filenamePrefix = filenamePrefixForWindow(intervalWindow);
        String filename =
            String.format(
                "pane-%d-%s-%05d-of-%05d%s",
                pane.getIndex(),
                pane.getTiming().toString().toLowerCase(),
                shardIndex,
                numShards,
                suffix);
        String fullName = filenamePrefix + filename;
        return fullName;
      }
    }
    
于 2018-08-17T16:18:13.747 回答