2

我正在尝试编写一个概念验证,它从 Kafka 获取消息,使用 Flink 上的 Beam 对其进行转换,然后将结果推送到不同的 Kafka 主题。

我使用 KafkaWindowedWordCountExample 作为起点,这是我想做的第一部分,但它输出到文本文件而不是 Kafka。FlinkKafkaProducer08 看起来很有希望,但我不知道如何将它插入管道。我在想它会用 UnboundedFlinkSink 或类似的东西包裹起来,但这似乎不存在。

关于我想要做什么的任何建议或想法?

我正在运行最新的孵化器光束(截至昨晚来自 Github)、集群模式下的 Flink 1.0.0 和 Kafka 0.9.0.1,所有这些都在 Google Compute Engine(Debian Jessie)上。

4

2 回答 2

1

目前 Beam 中没有 UnboundedSink 类。大多数无界接收器都是使用ParDo.

您可能希望查看KafkaIO 连接器。这是一个适用于所有 Beam runner 的 Kafka 阅读器,并实现了并行读取、检查点和其他UnboundedSourceAPI。该拉取请求还包括 TopHashtags 示例管道中的粗接收器,方法是在以下位置写入 Kafka ParDo

class KafkaWriter extends DoFn<String, Void> {

  private final String topic;
  private final Map<String, Object> config;
  private transient KafkaProducer<String, String> producer = null;

  public KafkaWriter(Options options) {
    this.topic = options.getOutputTopic();
    this.config = ImmutableMap.<String, Object>of(
        "bootstrap.servers", options.getBootstrapServers(),
        "key.serializer",    StringSerializer.class.getName(),
        "value.serializer",  StringSerializer.class.getName());
  }

  @Override
  public void startBundle(Context c) throws Exception {
    if (producer == null) { // in Beam, startBundle might be called multiple times.
      producer = new KafkaProducer<String, String>(config);
    }
  }

  @Override
  public void finishBundle(Context c) throws Exception {
    producer.close();
  }

  @Override
  public void processElement(ProcessContext ctx) throws Exception {
    producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
  }
}

当然,我们也想添加接收器支持KafkaIO。它实际上与KafkaWriter上面相同,但使用起来要简单得多。

于 2016-03-23T05:40:58.727 回答
0

用于写入 Kafka 的接收器转换已于 2016 年添加到 Apache Beam / Dataflow。有关KafkaIO使用示例,请参阅 Apache Beam 中的 JavaDoc。

于 2017-08-30T20:26:01.383 回答