我试图整合卡夫卡风暴。我刚开始举几个例子。
我能够从 GitHub 运行示例。接下来,我尝试在 Eclipse 中编写一个 Producer 类,以使用 KAFKA PRODUCER API 将消息发布到 kafka 主题。
场景1:
当我的消费者外壳使用说主题测试运行时,我运行我的生产者类。我能够看到我的消费者外壳与所有已发布的消息。
情景2
我还没有启动我的消费者外壳(说消费者已关闭)。我经营我的制作人课程。消息正在发布到 kafka。
现在,如果消息已发布,现在如果我启动消费者 shell,则在停机后,它不会读取已发布的消息主题。
为什么?我想它会维护主题消费的日志。不应该是看消息吗?
有没有我需要提及的配置参数?
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("zk.connect", "localhost:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for ( int nEvents=0; nEvents<events;nEvents++)
{
String ip="192.168.2."+rnd.nextInt(255);
String msg=getNextTradeData(); // Class to generate data
KeyedMessage<String,String> data=new KeyedMessage<String, String>("TradeFrequency",ip,msg);
Thread.sleep(100);
System.out.println(msg);
producer.send(data);
}
producer.close();
}
或者我需要做些什么来改变消费者。我正在使用包中提供的消费者外壳,并使用它启动它
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first-topic