2

我有一个基本拓扑,包括 kafka spout 和 kafka bolts 当提交我的拓扑时,我在 Storm UI 中收到此错误

无法获得 kafka 的偏移滞后。原因:org.apache.kafka.shaded.common.errors.InvalidTopicException: Topic '[enrich-topic]' 无效

我检查了丰富主题是否存在,没有问题

    TopologyBuilder streamTopologyBuilder = new TopologyBuilder();

    KafkaSpoutRetryService kafkaSpoutRetryService =  new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));

    KafkaSpoutConfig spoutConf =  KafkaSpoutConfig.builder(configProvider.getBootstrapServers(), configProvider.getSpoutTopic())
            .setGroupId("consumerGroupId")
            .setOffsetCommitPeriodMs(10_000)
            .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
            .setMaxUncommittedOffsets(1000000)
            .setRetry(kafkaSpoutRetryService)
            .build();
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
    streamTopologyBuilder.setSpout("kafkaSpout", kafkaSpout, 1);

    KafkaWriterBolt2 kafkaWriterBolt2 = null;
    try {
        kafkaWriterBolt2 = new KafkaWriterBolt2(configProvider.getBootstrapServers(), configProvider.getStreamKafkaWriterTopicName());
    } catch (IOException e) {
        e.printStackTrace();
    }
    streamTopologyBuilder.setBolt("kafkaWriterBolt2", kafkaWriterBolt2, 1).setNumTasks(1)
            .shuffleGrouping("kafkaSpout");

KafkaWriterBolt2是我的类扩展自 BaseRichBolt

4

0 回答 0