0

我正在执行一个简单的字数统计程序,其中我使用一个 Kafka 主题(生产者)作为输入源,然后我对其应用 pardo 以计算字数。现在我需要帮助根据它们的频率写出不同主题的词。假设所有频率均匀的单词将转到主题 1,其余的将转到主题 2。

谁能帮我举个例子?

4

1 回答 1

0

这可以使用 Kafka.io writeRecord 方法来完成,该方法采用 Producer<key,value> 然后使用 new Produce<>("topic_name","key","value") -

下面是代码-:

     static class ExtractWordsFn extends DoFn<String, String> {
        private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
        private final Distribution lineLenDist =
                Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");

        @ProcessElement
        public void processElement(@Element String element, OutputReceiver<String> receiver) {
            lineLenDist.update(element.length());
            if (element.trim().isEmpty()) {
                emptyLines.inc();
            }
            String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
            for (String word : words) {
                if (!word.isEmpty()) {
                    receiver.output(word);
                }
            } 
        }
    }

   
    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, ProducerRecord<String,String>> {
        @Override
        public ProducerRecord<String, String> apply(KV<String, Long> input) {
            if(input.getValue()%2==0)
             return new ProducerRecord("test",input.getKey(),input.getKey()+" "+input.getValue().toString());
            else
                return new ProducerRecord("copy",input.getKey(),input.getKey()+" "+input.getValue().toString());
        }
    }

    public static class CountWords
            extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
        @Override
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

            PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
            PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
            return wordCounts;
        }
    }
 p.apply("ReadLines", KafkaIO.<Long, String>read()
                .withBootstrapServers("localhost:9092")
                .withTopic("copy")// use withTopics(List<String>) to read from multiple topics.
                .withKeyDeserializer(LongDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
                .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
                .withLogAppendTime()
                .withReadCommitted()
                .commitOffsetsInFinalize()
                .withProcessingTime()
                .withoutMetadata()
        )
.apply(Values.create())
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(new CountWords())  
.apply(MapElements.via(new FormatAsTextFn())) //PCollection<ProducerRecord<string,string>>
.setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply("WriteCounts", (KafkaIO.<String, String>writeRecords()
 .withBootstrapServers("localhost:9092")
 //.withTopic("test")
 .withKeySerializer(StringSerializer.class)
 .withValueSerializer(StringSerializer.class)
                ))
于 2020-07-07T06:38:37.267 回答