首先,创建一个Property.java
设置配置并确保将其标记为@Component
private static final String TOPIC = "Kafka_Example";
public Properties settingProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("topic",TOPIC);
return props;
}
public Property() {
}
public void sendMessage(String msg) {
KafkaProducer<String, String> producer =
new KafkaProducer<String, String>(settingProperties());
ProducerRecord<String, String> record =
new ProducerRecord<String, String>(settingProperties().getProperty("topic"),
msg);
producer.send(record);
producer.close();
}
二、在你Controller Class
@Autowired
private Property property;
现在,你终于可以制作自己的方法了
@GetMapping("/publish/{name}")
public String post(@PathVariable("name") final String name) {
property.sendMessage(name);
return "Published successfully";
}
- 确保你的
TOPIC
名字在我的情况下是正确的Kafka_Example
这是您必须运行才能设置的命令
- 终端 1 - 运行 Zookerper:bin/zookeeper-server-start.sh config/zookeeper.properties
- 终端 2 - 运行 Kafka 服务器:bin/kafka-server-start.sh config/server.properties
- 终端 3 - 创建主题: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic
your-topic-name
- 终端 3 - 通过控制台消费:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic --from
your-topic name
-beginning
现在,你可以走了http://localhost:8080/api/publish/<Your-name>
我的 Pom 依赖项
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
添加上述依赖项以使用Producer API
和Consumer API
参考文档
觉得有用就点个赞吧。感谢您宝贵的时间。如果您有任何疑问,请在下面发表评论。