1

我有这个java代码试图将消息添加到kafka队列

    String msgID = UUID.randomUUID().toString();

    Properties prop = new Properties();
    prop.put("metadata.broker.list", DEFAULT_BROKER);
    prop.put("serializer.class", "kafka.serializer.StringEncoder");
    prop.put("request.required.acks", "-1");
    prop.put("producer.type", "async");

    ProducerConfig config = new ProducerConfig(prop);
    Producer<String, String> producer = new Producer<String, String>(config);

    KeyedMessage<String, String> message = new KeyedMessage<String, String>(TOPIC_NAME, msgID, "stackoverflow");

    try {
        producer.send(message);
        LOG.info("Messages sent with key" + msgID);
    } catch (Exception exception) {

    }

    producer.close();

kafka 代理是由 kerberos 保护的远程 Linux 机器

当我使用 Eclispe 在本地 Windows 机器上执行上述 java 代码时,我得到以下日志输出

2016-01-06 15:48:57 INFO  VerifiableProperties:68 - Verifying properties
2016-01-06 15:48:57 INFO  VerifiableProperties:68 - Property metadata.broker.list is overridden to remote000001.machine.test.group:6667,remote000002.machine.test.group:6667,remote000003.machine.test.group:6667
2016-01-06 15:48:57 INFO  VerifiableProperties:68 - Property producer.type is overridden to async
2016-01-06 15:48:57 INFO  VerifiableProperties:68 - Property request.required.acks is overridden to -1
2016-01-06 15:48:57 INFO  VerifiableProperties:68 - Property serializer.class is overridden to kafka.serializer.StringEncoder
2016-01-06 15:48:58 TRACE Producer:36 - Added to send queue an event: KeyedMessage(stackoverflowTopic,2f0c8b29-46e0-4860-961b-0fea480d21f9,2f0c8b29-46e0-4860-961b-0fea480d21f9,stackoverflow)
2016-01-06 15:48:58 TRACE Producer:36 - Remaining queue size: 10000
2016-01-06 15:48:58 INFO  KafkaPro:63 - Messages sent with key2f0c8b29-46e0-4860-961b-0fea480d21f9
2016-01-06 15:48:58 INFO  Producer:68 - Shutting down producer
2016-01-06 15:48:58 TRACE ProducerSendThread:36 - Dequeued item for topic stackoverflowTopic, partition key: 2f0c8b29-46e0-4860-961b-0fea480d21f9, data: stackoverflow
2016-01-06 15:48:58 INFO  ProducerSendThread:68 - Begin shutting down ProducerSendThread
2016-01-06 15:48:58 DEBUG ProducerSendThread:52 - Handling 1 events
2016-01-06 15:48:58 DEBUG DefaultEventHandler:52 - Handling 1 events
2016-01-06 15:48:58 TRACE SyncProducer:36 - Instantiating Scala Sync Producer with properties: {metadata.broker.list=remote000001.machine.test.group:6667,remote000002.machine.test.group:6667,remote000003.machine.test.group:6667, request.required.acks=-1, port=6667, serializer.class=kafka.serializer.StringEncoder, host=remote000002.machine.test.group, producer.type=async}
2016-01-06 15:48:58 INFO  ClientUtils$:68 - Fetching metadata from broker id:1,host:remote000002.machine.test.group,port:6667 with correlation id 0 for 1 topic(s) Set(stackoverflowTopic)
2016-01-06 15:48:58 TRACE SyncProducer:36 - verifying sendbuffer of size 34
2016-01-06 15:48:58 INFO  SyncProducer:68 - Connected to remote000002.machine.test.group:6667 for producing
2016-01-06 15:48:58 INFO  SyncProducer:68 - Disconnecting from remote000002.machine.test.group:6667
2016-01-06 15:48:58 WARN  ClientUtils$:89 - Fetching topic metadata with correlation id 0 for topics [Set(stackoverflowTopic)] from broker [id:1,host:remote000002.machine.test.group,port:6667] failed
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
    at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
    at kafka.utils.Utils$.swallow(Utils.scala:172)
    at kafka.utils.Logging$class.swallowError(Logging.scala:106)
    at kafka.utils.Utils$.swallowError(Utils.scala:45)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:94)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
2016-01-06 15:48:58 INFO  SyncProducer:68 - Disconnecting from remote000002.machine.test.group:6667
2016-01-06 15:48:58 TRACE SyncProducer:36 - Instantiating Scala Sync Producer with properties: {metadata.broker.list=remote000001.machine.test.group:6667,remote000002.machine.test.group:6667,remote000003.machine.test.group:6667, request.required.acks=-1, port=6667, serializer.class=kafka.serializer.StringEncoder, host=remote000001.machine.test.group, producer.type=async}
2016-01-06 15:48:58 INFO  ClientUtils$:68 - Fetching metadata from broker id:0,host:remote000001.machine.test.group,port:6667 with correlation id 0 for 1 topic(s) Set(stackoverflowTopic)
2016-01-06 15:48:58 TRACE SyncProducer:36 - verifying sendbuffer of size 34
2016-01-06 15:48:58 INFO  SyncProducer:68 - Connected to remote000001.machine.test.group:6667 for producing
2016-01-06 15:48:58 INFO  SyncProducer:68 - Disconnecting from remote000001.machine.test.group:6667
2016-01-06 15:48:58 WARN  ClientUtils$:89 - Fetching topic metadata with correlation id 0 for topics [Set(stackoverflowTopic)] from broker [id:0,host:remote000001.machine.test.group,port:6667] failed

我创建了一个包含以下内容的 kafka_jaas.conf 文件

KafkaServer {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/kafka.service.keytab"
   storeKey=true
   useTicketCache=false
   serviceName="kafka"
   principal="myPrincipal";
};
KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useTicketCache=true
   renewTicket=true
   serviceName="kafka";
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/kafka.service.keytab"
   storeKey=true
   useTicketCache=false
   serviceName="zookeeper"
   principal="myPrincipal";
};

并将其位置作为 JVM 参数传递

 -Djava.security.auth.login.config=C:\\kafka_jaas.conf

为什么这种方法不起作用?

你如何执行一个 kafka procuder,它正在将消息写入由 kerberos 保护的代理。

4

0 回答 0