3

我想向 Kafka 服务器发送消息。经纪人名单不包括本地主机。但是当produce调用send方法时出现异常:Producer connection to localhost:9092 unsuccessful

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

// create a producer

Properties props = new Properties();

props.put("metadata.broker.list", "192.168.1.203:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

producer = new Producer<String, String>(config);

//sending...
String topic = "data_collect_events";
String message = "_Message_1";
KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, message);
producer.send(keyedMessage);

例外:

ERROR [main] 2013-07-23 19:27:10,580 kafka.utils.Logging$class: Producer connection to localhost:9092 unsuccessful
java.net.ConnectException: Connection refused: connect
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:364)
    at sun.nio.ch.Net.connect(Net.java:356)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:623)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
    at scala.collection.Iterator$class.foreach(Iterator.scala:631)
    at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
    at enter code herekafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
    at kafka.producer.Producer.send(Producer.scala:74)
    at kafka.javaapi.producer.Producer.send(Producer.scala:32)
4

3 回答 3

5

在 kafka 0.8 中,代理列表仅用于检索元数据。然后,生产者使用返回的元数据信息连接到代理。元数据中的主机名取决于主机的操作系统设置(请参阅 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F )。

您可以在 server.properties 中设置 host.name。像这样

host.name=192.168.1.203
于 2013-12-11T08:43:15.073 回答
4

您很可能正在使用 Kafka 0.7 和 Kafka 0.8 的配置键,所以

props.put("metadata.broker.list", "192.168.1.203:9092");

当没有指定代理时,简单地忽略并且生产者默认为localhost:9092 。
你必须使用 broker.list而不是meta.broker.list.

于 2013-07-24T09:06:57.693 回答
0

我遇到了类似的问题,即位于 EC2 外部的 Kafka 生产者无法按照 Zookeeper 的预期解析内部 EC2 IP。

我编辑了该/etc/hosts文件以添加一个条目: public-ip&internal-ec2-ip 以便生产者与 Kafka 代理交谈。

于 2014-03-05T23:30:39.060 回答