1

使用 Kafka 生成消息

    import java.util.Date;
import java.util.Properties;

import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;





public class KafkaProducer {

    private static Producer<String, String> producer;
    public KafkaProducer()
    {
        Properties props = new Properties();
        props.put("metadata.broker.list","localhost:2181");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks","1");

        ProducerConfig config = new ProducerConfig(props);
        producer = new Producer<String,String>(config);

    }

    public static void main(String[] args)
    {
        if(args.length<2)
        {
            System.err.println("Usage: KafkaProducer TopicName MessageCount");
            System.exit(0);
        }
        String topic = args[0];
        int messageCount = Integer.parseInt(args[1]);

        KafkaProducer kafka = new KafkaProducer();
        kafka.publishMessage(topic,messageCount);

    }

    private void publishMessage(String topic, int messageCount)
    {
        for(int mcount=0;mcount<messageCount;mcount++)
        {
            String runtime = new Date().toString();
            String msg = "Message Published Time -" + runtime;
            System.out.println(msg);

            KeyedMessage<String,String> data = new KeyedMessage<String,String>(topic,msg);
            producer.send(data);

        }
        producer.close();
    }

}

在使用 eclipse 运行这个程序时,我得到了以下异常:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Message Published Time -Fri Jul 10 13:05:20 IST 2015
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at KafkaProducer.publishMessage(KafkaProducer.java:52)
    at KafkaProducer.main(KafkaProducer.java:39)

Zookeeper 服务启动,Broker 启动,Topic 创建。消费者也准备好了。

有人可以帮我解决这个问题吗?

4

2 回答 2

1

您可以尝试使用kafka 文档中简要描述的新 KafkaProducer 。

注意导入org.apache.kafka.clients.producer.*而不是类似的kafka.javaapi.producer.Producer东西:

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProducerTest {

    public static void main(String args[]) throws InterruptedException, ExecutionException {
        // set up Kafka producer
        KafkaProducer<String,String> kafkaProducer;
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // instantiate the producer
        kafkaProducer = new KafkaProducer<String,String>(props);

        // add data to kafka
        ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("topic", "test key", "test value");
        kafkaProducer.send(producerRecord);

        // close producer
        kafkaProducer.close();
    }
}
于 2015-07-10T09:08:26.460 回答
0

您能否检查一下您的 Kafka 服务器是否在生产者 API 使用的同一端口上运行?

通常 Kafka 集群在端口 9092 上运行。如果您的设置是这种情况,请在生产者配置中使用相同的端口。您的生产者正在使用端口 2181。这可能是一个错误。

于 2015-07-11T01:50:47.457 回答