8

我是 apache kafka 的初学者,并试图学习 confluent - kafka - rest - utils,但是我对如何使用它感到困惑。

在搜索过程中,我发现了这个文档https://spring.io/blog/2015/04/15/using-apache-kafka-for-integration-and-data-processing-pipelines-with-spring

这是非常好的文档,但它对我学习 rest-utils 没有帮助。

confluent kafka rest utils的git代码是

https://github.com/confluentinc/kafka-rest演示了如何使用 rest kafka。但我想知道确切的程序以更加了解它。用一些简单的解释。谁能给我一些关于我如何使用休息客户端的链接。请指导我。

这可能是一个愚蠢的问题,但我没有其他选择来学习。

提前致谢。

4

2 回答 2

2

首先尝试使用 spring MVC 创建一个 REST 服务,将 kafka 的东西放在一边。

一旦您能够运行“hello world”类型的 REST 服务,然后选择 Kafka Docs。

请参阅有关如何创建 kafka 集群的文档,并运行默认的控制台消费者和生产者程序来检查您的集群。

现在,编写一个主 java 程序并使用Kafka Clients API创建一个 kafka 生产者。请参阅其文档。确保通过主程序发送的消息到达消费者。

现在,将这个主程序的内容注入到rest服务中,这样在请求体中传递的消息现在被传递到kafka集群并可供消费者读取。

希望能帮助到你。

于 2015-07-31T10:56:49.673 回答
0

首先,创建一个Property.java设置配置并确保将其标记为@Component

 private static final String TOPIC = "Kafka_Example";

public Properties settingProperties() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put("topic",TOPIC);
    return props;
}

public Property() {
}

public void sendMessage(String msg) {

    KafkaProducer<String, String> producer =
            new KafkaProducer<String, String>(settingProperties());

    ProducerRecord<String, String> record =
            new ProducerRecord<String, String>(settingProperties().getProperty("topic"),
            msg);
    producer.send(record);

    producer.close();

}

二、在你Controller Class

@Autowired
private Property property;

现在,你终于可以制作自己的方法了

@GetMapping("/publish/{name}")
public String post(@PathVariable("name") final String name) {

    property.sendMessage(name);

    return "Published successfully";
}
  • 确保你的TOPIC名字在我的情况下是正确的Kafka_Example

这是您必须运行才能设置的命令

  • 终端 1 - 运行 Zookerper:bin/zookeeper-server-start.sh config/zookeeper.properties
  • 终端 2 - 运行 Kafka 服务器:bin/kafka-server-start.sh config/server.properties
  • 终端 3 - 创建主题: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topicyour-topic-name
  • 终端 3 - 通过控制台消费:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic --from your-topic name-beginning

现在,你可以走了http://localhost:8080/api/publish/<Your-name>

我的 Pom 依赖项

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>

添加上述依赖项以使用Producer APIConsumer API

参考文档

觉得有用就点个赞吧。感谢您宝贵的时间。如果您有任何疑问,请在下面发表评论。

于 2020-07-01T08:24:27.500 回答