我正在执行一个简单的字数统计程序,其中我使用一个 Kafka 主题(生产者)作为输入源,然后我对其应用 pardo 以计算字数。现在我需要帮助根据它们的频率写出不同主题的词。假设所有频率均匀的单词将转到主题 1,其余的将转到主题 2。
谁能帮我举个例子?
我正在执行一个简单的字数统计程序,其中我使用一个 Kafka 主题(生产者)作为输入源,然后我对其应用 pardo 以计算字数。现在我需要帮助根据它们的频率写出不同主题的词。假设所有频率均匀的单词将转到主题 1,其余的将转到主题 2。
谁能帮我举个例子?
这可以使用 Kafka.io writeRecord 方法来完成,该方法采用 Producer<key,value> 然后使用 new Produce<>("topic_name","key","value") -
下面是代码-:
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist =
Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
lineLenDist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}
String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, ProducerRecord<String,String>> {
@Override
public ProducerRecord<String, String> apply(KV<String, Long> input) {
if(input.getValue()%2==0)
return new ProducerRecord("test",input.getKey(),input.getKey()+" "+input.getValue().toString());
else
return new ProducerRecord("copy",input.getKey(),input.getKey()+" "+input.getValue().toString());
}
}
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
p.apply("ReadLines", KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("copy")// use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withLogAppendTime()
.withReadCommitted()
.commitOffsetsInFinalize()
.withProcessingTime()
.withoutMetadata()
)
.apply(Values.create())
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn())) //PCollection<ProducerRecord<string,string>>
.setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply("WriteCounts", (KafkaIO.<String, String>writeRecords()
.withBootstrapServers("localhost:9092")
//.withTopic("test")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
))