0

我对 Springboot 和 Kafka 很陌生。在学校作业中使用 Springboot 应用程序,我们需要在 Kafka 主题上发布 Json 数据。我的发布 .java 文件如下所示:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.CMPE172.kafka.springbootkafkaproducerexample.model.User;

@RestController
@RequestMapping("kafka")
public class UserResource {
    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;
    private final static String TOPIC = "Kafka";

    @GetMapping("/publish/{name}")
    public String Post(@PathVariable("name") final String name) {

        kafkaTemplate.send(TOPIC, new User(name, "Technology", 12000L));
        return "Published successfully";
   }
}

其中 User 只是一个带有构造函数、getter 和 setter 的普通 Java 类。我的配置文件如下所示:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.CMPE172.kafka.springbootkafkaproducerexample.model.User;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
import java.util.HashMap;

@Configuration
public class KafkaConfiguration {

    @Bean
    public ProducerFactory<String, User> produceFactory() {

        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG ,  "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ,  StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ,  JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
   public KafkaTemplate<String, User> kafkaTemplate() {
        return new KafkaTemplate<>(produceFactory());
    }

}

成功启动 zookeeper 和 Kafka 服务器后,我正在使用以下命令创建一个新主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions    
1 --topic Kafka

然后我使用以下命令启动消费者:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Kafka --from-  
beginning

然后,我可以通过简单地访问 localhost:8081/kafka/publish/Adam 成功地在主题中发布 Json 消息/对象(在这种情况下,正在发布名为 Adam、部门“技术”和薪水 12000 的 Json 对象)

问题/bug:每次发布新名称时,我之前发布的名称都会第二次显示。例如,如果我转到 localhost:8081/kafka/publish/Jim 包含 Adam 的 Json 文件将与 Jim 一起再次发布。此外,如果我重新启动所有服务器,发布新的 Json 数据似乎会调用之前在服务器重新启动之前发布的发布名称。

长话短说,看在线视频,每个出版物一次只能发布一个名字,没有别的。但是,在我的情况下,每个新出版物都会发布重复的值。有人可以指出我正确的方向吗?所有帮助将不胜感激。先感谢您!

4

1 回答 1

0

您正在附加到日志。每个请求都会创建一个全新的消息,而不是覆盖旧消息。

您拥有的事实--from-beginning意味着您将始终从头开始扫描主题的每条消息,而不是等待新消息到达。

如果您确实认为有重复的消息被发布,您可以使用 OffsetShell 列出该主题的最新偏移量

于 2020-03-17T02:49:21.880 回答