我已经编写了一个示例拓扑,它将使用来自 kafka 的消息并记录它。请在下面找到代码片段
端到端拓扑很好。当我在 Kafka Producer 中发布消息时,它会被正确使用。我只是收到消息并将其记录在 MessagePrinter 中。
问题描述如下
用例 1:我关闭了拓扑,发送了消息 1-10,当我启动拓扑时,拓扑正确记录了消息 2-10,并且仅第一条消息没有被记录。
用例 2:拓扑启动时会发生同样的问题,第一条消息没有被处理,即将到来的消息被拓扑正确记录
public class Topology extends BaseTopologyBuilder {
//overridden "apply" method where from the record only message is extracted out "record.value()"
private static Func<ConsumerRecord<String, String>, List<Object>> FUNCTION = new CommandValueFunction();
public void config() throws IOException {
config = new Config();
config.setDebug(false);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1)));
config.setNumWorkers(1);
config.put("topology.spout.max.batch.size", 1);
}
protected KafkaSpoutConfig<String, String> spoutConfig(String topic) {
return KafkaSpoutConfig
.builder(localhost:9092, topic)
.setGroupId("kafkaSpoutTestGroup")
.setMaxPartitionFectchBytes(2000000000)
.setRecordTranslator(FUNCTION, new Fields("message"))
.setRetry(newRetryService()).setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
.setMaxUncommittedOffsets(250)
.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.build();
}
//MessagePrinter extends BaseFunction and tuples are printed in execute()
public void buildTopology(MessagePrinter bolt) {
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout", new KafkaTridentSpoutOpaque<>(spoutConfig("sample")));
stream.each(new Fields("message"), bolt, new Fields()).parallelismHint(2);
try {
config.put("zookeeper.ip", localhost:2182);
StormSubmitter.submitTopology("topology", config, topology.build());
} catch (Exception exception) { }
}
public static void main(String[] args) throws Exception {
Topology topologyBuilder = new Topology();
topologyBuilder.config();
topologyBuilder.buildTopology();
}}