3

我试图整合卡夫卡风暴。我刚开始举几个例子。

我能够从 GitHub 运行示例。接下来,我尝试在 Eclipse 中编写一个 Producer 类,以使用 KAFKA PRODUCER API 将消息发布到 kafka 主题。

场景1:

当我的消费者外壳使用说主题测试运行时,我运行我的生产者类。我能够看到我的消费者外壳与所有已发布的消息。

情景2

我还没有启动我的消费者外壳(说消费者已关闭)。我经营我的制作人课程。消息正在发布到 kafka。

现在,如果消息已发布,现在如果我启动消费者 shell,则在停机后,它不会读取已发布的消息主题。

为什么?我想它会维护主题消费的日志。不应该是看消息吗?

有没有我需要提及的配置参数?

Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("zk.connect", "localhost:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);

        for ( int nEvents=0; nEvents<events;nEvents++)
        {
          String ip="192.168.2."+rnd.nextInt(255);
          String msg=getNextTradeData(); // Class to generate data
          KeyedMessage<String,String> data=new KeyedMessage<String, String>("TradeFrequency",ip,msg);
          Thread.sleep(100);
          System.out.println(msg);
          producer.send(data);  

        }
producer.close();

}

或者我需要做些什么来改变消费者。我正在使用包中提供的消费者外壳,并使用它启动它

  bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first-topic
4

1 回答 1

1

当您启动时,kafka-console-consume它将从当前偏移量读取。这是,偏移NOW()而不是从过去。

要查看消息是否已发布,您有两种选择:

  1. 使用--from-beginning选项,从主题的开头阅读

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first-topic --from-beginning

  2. --consumer.config使用选项在 zookeeper/kafka 中持久化控制台消费者的状态

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first-topic --consumer.config /home/sql-injection/consumer-config.txt

根据这个不错的页面,您在消费者配置上需要的参数是:consumer.idclient.id

于 2015-10-13T09:10:31.743 回答