0

我已经编写了一个示例拓扑,它将使用来自 kafka 的消息并记录它。请在下面找到代码片段

端到端拓扑很好。当我在 Kafka Producer 中发布消息时,它会被正确使用。我只是收到消息并将其记录在 MessagePrinter 中。

问题描述如下

用例 1:我关闭了拓扑,发送了消息 1-10,当我启动拓扑时,拓扑正确记录了消息 2-10,并且仅第一条消息没有被记录。

用例 2:拓扑启动时会发生同样的问题,第一条消息没有被处理,即将到来的消息被拓扑正确记录

public class Topology extends BaseTopologyBuilder { 
//overridden "apply" method where from the record only message is extracted out "record.value()"
private static Func<ConsumerRecord<String, String>, List<Object>> FUNCTION = new CommandValueFunction();
public void config() throws IOException {
    config = new Config();
    config.setDebug(false);
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1)));
    config.setNumWorkers(1);
    config.put("topology.spout.max.batch.size", 1);
}
protected KafkaSpoutConfig<String, String> spoutConfig(String topic) {
    return KafkaSpoutConfig
            .builder(localhost:9092, topic)
            .setGroupId("kafkaSpoutTestGroup")
            .setMaxPartitionFectchBytes(2000000000)
            .setRecordTranslator(FUNCTION, new Fields("message"))
            .setRetry(newRetryService()).setOffsetCommitPeriodMs(10_000)
            .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
            .setMaxUncommittedOffsets(250)
            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
            .build();
}
//MessagePrinter extends BaseFunction and tuples are printed in execute()
public void buildTopology(MessagePrinter bolt) {
    TridentTopology topology = new TridentTopology();
    Stream stream = topology.newStream("spout", new KafkaTridentSpoutOpaque<>(spoutConfig("sample")));
    stream.each(new Fields("message"), bolt, new Fields()).parallelismHint(2);
    try {
        config.put("zookeeper.ip", localhost:2182);
        StormSubmitter.submitTopology("topology", config, topology.build());
    } catch (Exception exception) { }
}
public static void main(String[] args) throws Exception {
    Topology topologyBuilder = new Topology();
    topologyBuilder.config();
    topologyBuilder.buildTopology();
}}
4

0 回答 0