1

我有简单的@Bean(Java 8 函数)映射到目标topic-out-in)。

@Bean
public Function<String, String> transform() {
    return payload -> payload.toUpperCase();
}

@Bean
public Consumer<String> receive() {
    return payload -> logger.info("Data received: " + payload);
}

.yml 配置:

spring:
  cloud:
    stream:
      function:
        definition: transform;receive
      bindings:
        transform-out-0:
          destination: myTopic
        receive-in-0:
          destination: myTopic

现在,我想通过调用来调用该transform函数,REST以便它的输出到达destination topic(即transform-out-0映射到)并由来自该目的地(映射到)myTopic的 拾取。基本上,每个 REST 调用都应该生成一个新的KAFKA实例并关闭它。consumerreceive-in-0myTopic Producer

我怎样才能做到这一点,请使用spring-cloud-stream

谢谢

昂舒曼

4

1 回答 1

5

您应该使用StreamBridge而不是拥有该transform功能。这是 Spring Cloud Stream 中动态目的地的新推荐方法。这是基本思想:

@Autowired
private StreamBridge streamBridge;

@RequestMapping
public void delegateToSupplier(@RequestBody String body) {
    streamBridge.send("transform-out-0", body);
}

然后通过配置提供这个属性—— spring.cloud.stream.source: transform

Spring Cloud Stream 将创建一个为您调用的输出绑定transform-out-0。每次调用 REST 端点时StreamBridge,您都​​会将数据发送到目标主题。

有关更多信息,请参阅

于 2020-06-25T14:16:17.617 回答