我对 Springboot 和 Kafka 很陌生。在学校作业中使用 Springboot 应用程序,我们需要在 Kafka 主题上发布 Json 数据。我的发布 .java 文件如下所示:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.CMPE172.kafka.springbootkafkaproducerexample.model.User;
@RestController
@RequestMapping("kafka")
public class UserResource {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
private final static String TOPIC = "Kafka";
@GetMapping("/publish/{name}")
public String Post(@PathVariable("name") final String name) {
kafkaTemplate.send(TOPIC, new User(name, "Technology", 12000L));
return "Published successfully";
}
}
其中 User 只是一个带有构造函数、getter 和 setter 的普通 Java 类。我的配置文件如下所示:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.CMPE172.kafka.springbootkafkaproducerexample.model.User;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
import java.util.HashMap;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, User> produceFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(produceFactory());
}
}
成功启动 zookeeper 和 Kafka 服务器后,我正在使用以下命令创建一个新主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions
1 --topic Kafka
然后我使用以下命令启动消费者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Kafka --from-
beginning
然后,我可以通过简单地访问 localhost:8081/kafka/publish/Adam 成功地在主题中发布 Json 消息/对象(在这种情况下,正在发布名为 Adam、部门“技术”和薪水 12000 的 Json 对象)
问题/bug:每次发布新名称时,我之前发布的名称都会第二次显示。例如,如果我转到 localhost:8081/kafka/publish/Jim 包含 Adam 的 Json 文件将与 Jim 一起再次发布。此外,如果我重新启动所有服务器,发布新的 Json 数据似乎会调用之前在服务器重新启动之前发布的发布名称。
长话短说,看在线视频,每个出版物一次只能发布一个名字,没有别的。但是,在我的情况下,每个新出版物都会发布重复的值。有人可以指出我正确的方向吗?所有帮助将不胜感激。先感谢您!