0

现在,我有以下代码:

PCollection<String> input_data =
    pipeline
        .apply(PubsubIO
            .Read
            .withCoder(StringUtf8Coder.of())
            .named("ReadFromPubSub")
            .subscription("/subscriptions/project_name/subscription_name"));
4

3 回答 3

1

看起来您想从 pubsub 读取一些消息,并通过在空格字符上拆分消息将每个消息转换为多个部分,然后将这些部分提供给管道的其余部分。不需要对 PubsubIO 进行特殊配置,因为它不是“读取数据”问题——它是“转换你已经读取的数据”问题——你只需要插入一个 ParDo 来获取你的“复合”记录并将其分解到你想要的方式,例如:

PCollection<String> input_data =
pipeline
    .apply(PubsubIO
        .Read
        .withCoder(StringUtf8Coder.of())
        .named("ReadFromPubSub")
        .subscription("/subscriptions/project_name/subscription_name"))
    .apply(ParDo.of(new DoFn<String, String>() {
      public void processElement(ProcessContext c) {
        String composite = c.element();
        for (String part : composite.split(" ")) {
          c.output(part);
        }
      }}));
    }));
于 2015-06-10T18:19:44.060 回答
0

我认为您的意思是您想要的数据存在于 PCollection 的不同元素中,并且希望以某种方式对其进行提取和分组。

一种可能的方法是编写一个 DoFn 函数来处理 PCollection 中的每个字符串。您为要分组的每条数据输出一个键值对。然后,您可以使用 GroupByKey 转换将所有相关数据组合在一起。

例如,您的 PCollection 中有以下来自 pubsub 的消息:

  1. 用户 1234 购买了商品 A
  2. 用户 1234 购买了商品 B

DoFn 函数将输出一个键值对,其中用户 id 为键,购买的商品为值。( <1234,A> , <1234, B> )。使用 GroupByKey 转换,您可以将两个值组合到一个元素中。然后,您可以对该元素执行进一步处理。

这是大数据中一种非常常见的模式,称为 mapreduce。

于 2015-06-10T14:41:12.147 回答
0

您可以输出Iterable<A>然后使用Flatten来压缩它。不出所料,这flatMap在许多下一代数据处理平台中被称为,cf spark / flink。

于 2015-06-10T22:56:07.060 回答