0

我正在使用Spring Cloud Streams 文档来尝试解决如何通过已在 Gradle 中下载的活页夹将我的微服务连接到 Kafka。我尝试@Bean Function<String, String>()在我的 Spring Boot Application 类中创建一个简单的方法,并验证它能够通过使用命令行与 Kafka 进行交互来uppercase-in-0uppercase-out-0主题交互,如文档开头所述,确认应用程序能够与卡夫卡通信。在这一点上,我尝试创建以下类,并期望它将通过自动发现加载:

package com.yuknis.loggingconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LoggingConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

}

package com.yuknis.loggingconsumer.functions;

import java.util.function.Function;

public class CharCounter implements Function<String, Integer> {

    /**
     * Applies this function to the given argument.
     *
     * @param s the function argument
     * @return the function result
     */
    @Override
    public Integer apply(String s) {
        return s.length();
    }

}

使用application.properties这样的文件:

spring.cloud.function.scan.packages:com.yuknis.loggingconsumer.functions

我不是 100% 确定现在应该发生什么,但我假设它应该看到类并自动创建一个我可以消费charcounter-out-0charcounter-in-0发布到的主题,这些主题中的数据通过该函数。这不是正在发生的事情。我可能会错过什么?此类是否应该以与创建主题相同的方式创建主题@Bean

4

1 回答 1

0

即使每个函数都加载了spring.cloud.function.scan.packagesset to a package 并spring.cloud.function.scan.enabled设置为true,它仍然不会创建主题。您仍然需要设置spring.cloud.function.scan.definitionFunction, Consumer,或者Supplier您希望像这样与 Kafka 进行通信:

spring.cloud.function:
  scan:
    enabled: true
    packages: com.yuknis.loggingconsumer.functions
  definition: charCounter;lowercase;uppercase

之后,它将创建charCounter-in-0andcharCounter-out-0主题,如果需要,可以使用spring.cloud.function.charCounter-in-0orspring.cloud.function.charCounter-out-0表达式属性进行映射。

于 2020-03-25T12:59:25.207 回答