当我产生消息时,我想从经纪人那里得到一些回应。我已经尝试过使用的 CallBack 机制(通过实现 CallBack),KafkaProducer.send
但它不起作用并且不调用onCompletion
方法。
当我关闭 Kafka 服务器并尝试生成消息时,它会调用回调方法。
有没有其他方法可以得到认可?
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("Called Callback method");
if (metadata != null) {
System.out.println("message(" + key + ", " + message
+ ") sent to partition(" + metadata.partition() + "), "
+ "offset(" + metadata.offset() + ") in " + elapsedTime
+ " ms");
} else {
exception.printStackTrace();
}
}
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
long runtime = new Date().getTime();
String ip = "192.168.2."+ rnd.nextInt(255);
String msg = runtime + ".www.ppop.com," + ip;
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`
我正在使用带有代理版本 0.8.2 的 kafka-client api 0.9.1。