目前 Beam 中没有 UnboundedSink 类。大多数无界接收器都是使用ParDo
.
您可能希望查看KafkaIO 连接器。这是一个适用于所有 Beam runner 的 Kafka 阅读器,并实现了并行读取、检查点和其他UnboundedSource
API。该拉取请求还包括 TopHashtags 示例管道中的粗接收器,方法是在以下位置写入 Kafka ParDo
:
class KafkaWriter extends DoFn<String, Void> {
private final String topic;
private final Map<String, Object> config;
private transient KafkaProducer<String, String> producer = null;
public KafkaWriter(Options options) {
this.topic = options.getOutputTopic();
this.config = ImmutableMap.<String, Object>of(
"bootstrap.servers", options.getBootstrapServers(),
"key.serializer", StringSerializer.class.getName(),
"value.serializer", StringSerializer.class.getName());
}
@Override
public void startBundle(Context c) throws Exception {
if (producer == null) { // in Beam, startBundle might be called multiple times.
producer = new KafkaProducer<String, String>(config);
}
}
@Override
public void finishBundle(Context c) throws Exception {
producer.close();
}
@Override
public void processElement(ProcessContext ctx) throws Exception {
producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
}
}
当然,我们也想添加接收器支持KafkaIO
。它实际上与KafkaWriter
上面相同,但使用起来要简单得多。