我试图产生一些消息并放入主题,然后从控制台消费者那里获取相同的信息。
使用的代码:
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class SimpleProducer {
private static Producer<String,String> producer;
public SimpleProducer() {
Properties props = new Properties();
// Set the broker list for requesting metadata to find the lead broker
props.put("metadata.broker.list","172.22.96.56:9092,172.22.96.56:9093,172.22.96.56:9094");
//This specifies the serializer class for keys
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 1 means the producer receives an acknowledgment once the lead replica
// has received the data. This option provides better durability as the
// client waits until the server acknowledges the request as successful.
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
}
public static void main(String[] args) {
int argsCount = args.length;
if (argsCount == 0 || argsCount == 1)
throw new IllegalArgumentException(
"Please provide topic name and Message count as arguments");
String topic = (String) args[0];
String count = (String) args[1];
int messageCount = Integer.parseInt(count);
System.out.println("Topic Name - " + topic);
System.out.println("Message Count - " + messageCount);
SimpleProducer simpleProducer = new SimpleProducer();
simpleProducer.publishMessage(topic, messageCount);
}
private void publishMessage(String topic, int messageCount) {
for (int mCount = 0; mCount < messageCount; mCount++) {
String runtime = new Date().toString();
String msg = "Message Publishing Time - " + runtime;
System.out.println(msg);
// Creates a KeyedMessage instance
KeyedMessage<String, String> data =
new KeyedMessage<String, String>(topic, msg);
// Publish the message
producer.send(data);
}
// Close producer connection with broker.
producer.close();
}
}
输出:
主题名称 - 测试消息计数 - 10 log4j:WARN 找不到记录器 (kafka.utils.VerifiableProperties) 的附加程序。log4j:WARN 请正确初始化 log4j 系统。消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 年消息发布时间 - 2 月 16 日星期二:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016 消息发布时间 - 2 月 16 日星期二 02:00:56 IST 2016
从命令行我提供主题名称为“kafkatopic”和消息计数“10”。该程序运行良好,没有异常,但是当我尝试从控制台查看消息时,它们没有出现。主题已创建。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
你能帮忙看看出了什么问题!!