0

我正在尝试使用 Java 创建的生产者向 Kafka 代理上的主题发送消息。我可以通过控制台向主题发送消息,即使用 kafka-console-producer.sh。但是,当我尝试对在 Java 中创建的生产者执行相同操作时,我收到超时异常,并显示一条消息“100000 毫秒后无法获取元数据。我在此处附加了 kafka 的生产者代码和 server.properties

获取生产者():

private synchronized Producer<String, String> getProducer() {
    if (!producer.isPresent()) {
        Properties prodProps = new Properties();
        prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,127.0.0.1:9092");
        prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 100000);
        prodProps.put(ProducerConfig.ACKS_CONFIG, "all");
        prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        setProducer(new KafkaProducer<>(prodProps));
    }
    return producer.get();
}

发布有效负载():

private Future<RecordMetadata> publishPayload(DataObject dataObject) {
    String topic = //topic name;
    String key = // unique ID;
    String payload = // String payload;
    return getProducer().send(new ProducerRecord<>(topic, key, payload));
}

服务器属性

# The id of the broker. This must be set to a unique integer for each 
broker.
broker.id=0

############################# Socket Server Settings 
#############################

listeners=PLAINTEXT://127.0.0.1:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
host.name=127.0.0.1

# Hostname the broker will advertise to producers and consumers. If not set, 
it uses the
# value for "host.name" if configured.  Otherwise, it will use the value 
returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=127.0.0.1

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port=9092

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept 
(protection against OOM)
socket.request.max.bytes=204857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow 
greater
# parallelism for consumption, but this will also result in more files 
across
# the brokers.
num.partitions=3

# The number of threads per data directory to be used for log recovery at 
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs 
located in RAID array.
num.recovery.threads.per.data.dir=1

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

生产者属性:

# format: host1:port1,host2:port2 ...
metadata.broker.list=localhost:9092

# name of the partitioner class for partitioning events; default partition 
spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or 
synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none, gzip, snappy, 
lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, 
lz4, 
respectively
compression.codec=none

# message encoder
serializer.class=kafka.serializer.DefaultEncoder

如果我在这里遗漏了什么以及如何让 Java 生产者能够与 kafka 主题进行交流,请告诉我。

4

0 回答 0