2

我试图产生一些消息并放入主题,然后从控制台消费者那里获取相同的信息。

使用的代码:

import  java.util.Date;
import  java.util.Properties;
import  kafka.javaapi.producer.Producer;
import  kafka.producer.KeyedMessage;
import  kafka.producer.ProducerConfig;


public  class   SimpleProducer  {
    private static  Producer<String,String> producer;
    public  SimpleProducer()    {
            Properties  props   =   new Properties();

            //  Set the broker  list    for requesting  metadata    to  find    the lead    broker
            props.put("metadata.broker.list","172.22.96.56:9092,172.22.96.56:9093,172.22.96.56:9094");

            //This  specifies   the serializer  class   for keys    
            props.put("serializer.class",   "kafka.serializer.StringEncoder");

            //  1   means   the producer    receives    an  acknowledgment  once    the lead replica    
            //  has received    the data.   This    option  provides    better  durability  as  the     
            //  client  waits   until   the server  acknowledges    the request as successful.
            props.put("request.required.acks",  "1");

            ProducerConfig  config  =   new ProducerConfig(props);
            producer    =   new Producer<String,    String>(config);
    }
    public  static  void    main(String[]   args)   {
            int argsCount   =   args.length;
            if  (argsCount  ==  0   ||  argsCount   ==  1)
                    throw   new IllegalArgumentException(
                            "Please provide topic   name    and Message count   as  arguments");

            String  topic   =   (String)    args[0];
            String  count   =   (String)    args[1];
            int messageCount    =   Integer.parseInt(count);
            System.out.println("Topic   Name    -   "   +   topic);
            System.out.println("Message Count   -   "   +   messageCount);
            SimpleProducer  simpleProducer  =   new SimpleProducer();
            simpleProducer.publishMessage(topic,    messageCount);
    }
    private void    publishMessage(String   topic,  int messageCount)   {
            for (int    mCount  =   0;  mCount  <   messageCount;   mCount++)   {
                    String  runtime =   new Date().toString();
                    String  msg =   "Message    Publishing  Time    -   "   +   runtime;
                    System.out.println(msg);
                    //  Creates a   KeyedMessage    instance
                    KeyedMessage<String,    String> data    =   
                            new KeyedMessage<String,    String>(topic,  msg);

                    //  Publish the message
                    producer.send(data);
            }
            //  Close   producer    connection  with    broker.
            producer.close();
    }

}

输出:

主题名称 - 测试消息计数 - 10 log4j:WARN 找不到记录器 (kafka.utils.VerifiableProperties) 的附加程序。log4j:WARN 请正确初始化 log4j 系统。消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 年消息发布时间 - 2 月 16 日星期二:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016

从命令行我提供主题名称为“kafkatopic”和消息计数“10”。该程序运行良好,没有异常,但是当我尝试从控制台查看消息时,它们没有出现。主题已创建。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning

你能帮忙看看出了什么问题!!

4

1 回答 1

0

我想指出两点:

1)你没有--zookeeper在这里指定 - 你应该--bootstrap-server争论。

2) 您应该看到server.properties文件中关于listenersand的内容advertised.listener。您应该正确地将它们指向经纪人。

我希望这有帮助。

于 2017-05-11T07:18:14.393 回答