8

当我产生消息时,我想从经纪人那里得到一些回应。我已经尝试过使用的 CallBack 机制(通过实现 CallBack),KafkaProducer.send但它不起作用并且不调用onCompletion方法。

当我关闭 Kafka 服务器并尝试生成消息时,它会调用回调方法。

有没有其他方法可以得到认可?

@Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        System.out.println("Called Callback method");
        if (metadata != null) {
            System.out.println("message(" + key + ", " + message
                    + ") sent to partition(" + metadata.partition() + "), "
                    + "offset(" + metadata.offset() + ") in " + elapsedTime
                    + " ms");
        } else {
            exception.printStackTrace();
        }

    }

props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);

KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
long runtime = new Date().getTime(); 
String ip = "192.168.2."+ rnd.nextInt(255); 
String msg = runtime + ".www.ppop.com," + ip;
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`

我正在使用带有代理版本 0.8.2 的 kafka-client api 0.9.1。

4

3 回答 3

4

KafkaProducer 使用 kafka 0.9 版本发布消息后,有一种简单的方法可以从代理获取信息。您可以调用 get() 方法,该方法将返回一个 RecordMetadata 对象,您可以在下面的代码片段中获取偏移量、topicPartition 等信息作为示例:

RecordMetadata m = kafkaProducer.send(new ProducerRecord<byte[], byte[]>(
                        topic, key.getBytes("UTF-8"), message
                                .getBytes("UTF-8"))).get();
System.out.println("Message produced, offset: " + m.offset());
System.out.println("Message produced, partition : " + m.partition());
System.out.println("Message produced, topic: " + m.topic());
于 2016-02-02T22:45:13.543 回答
4

所以我不能 100% 确定哪些版本适用于 Kafka。目前我使用的是 0.8.2,我知道 0.9 引入了一些重大更改,但我无法确定现在哪些工作/不工作。

一个非常强烈的建议是,我会使用与您的代理版本相对应的 Kafka-Client 版本。如果您使用的是代理 0.8.2,我也会使用 kakfa-client 0.8.2。

你从来没有提供任何关于你如何使用它的代码,所以我只是在黑暗中猜测。但是我已经在 Kafka 0.8.2 中通过在 producer 中使用此方法实现了 Callback 功能。下面是方法签名。

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)

我将在哪里调用该方法,我实际上通过覆盖方法传入了类。

KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = //data to send to kafka
prod.send(record, new Callback() {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null) {
      e.printStackTrace();
    } else {
      //implement logic here, or call another method to process metadata
      System.out.println("Callback");
    }
  }
}); 

我假设有一种方法也可以像你那样做。但是您必须提供代码来显示您实际上如何将记录发送到 Kafka。除此之外,我只是猜测。

于 2015-12-07T14:01:20.427 回答
3

关闭生产者——一旦你完成了所有消息的发布——就像这样:

producer.close();

这可确保始终调用 Callback 中的以下方法:

onCompletion(RecordMetadata metadata, Exception exception)

注意:我已经通过将该行添加到此示例 Producer 类中进行了测试,并且它可以工作。

于 2016-03-12T09:26:42.457 回答