0

我想要一个连接到特定队列/主题的绑定,并根据特定的标题条目路由到正确的函数。

我找不到这种情况的任何例子。我尝试了几种方法,但没有一种方法能成功。

例如,这不起作用:

spring:
  cloud:
    function:
      routing:
        enabled: true
    stream:
      function:
        routing:
          enabled: true
        definition: myConsumer;myOtherConsumer;
        bindings:
          myConsumer-in-0:
            destination: myTopic
            group:  myGroup
            binder: myBroker
            routing-expression: "headers['MyRoutingInfo'] == 'even' ? 'myEvenConsumer' : 'myOddConsumer'"
          myOtherConsumer-in-0: #without specific routing

每一个具体的例子都值得赞赏

4

4 回答 4

0

消费者不“路由”消息,他们从队列中消费。生产者使用s.c.s.rabbit.bindings.producer-out-0.producer.routing-key-expression.

于 2020-10-19T14:58:04.137 回答
0

我终于找到了实现目标的方法。但我不确定这是否是这样做的方法:

    spring:
      cloud:
        function:
          routing:
            enabled: true
          routing-expression: "headers['MyRouting'] == 'odd' ? 'oddConsumer' : 'evenConsumer'"
        stream:
          function:
            definition: myConsumer;myOtherConsumer;
            bindings:
              myConsumer-in-0:
                destination: myTopic
                group:  myGroup
                binder: myBroker
              myOtherConsumer-in-0: #without specific routing

使用以下豆类:

@Bean
public Consumer<Message<byte[]>> myConsumer(final RoutingFunction routingFunction) {
        return message -> {
           LOG.info("Sending to routingFunction");
           routingFunction.apply(message);
        };
}

@Bean
public Consumer<byte[]> evenConsumer() {
      return (payload) -> LOG.info("even got: {}", new String(payload));
}

@Bean
public Consumer<byte[]> oddConsumer() {
    return (payload) -> LOG.info("odd got: {}", new String(payload));
}
于 2020-10-20T15:09:17.443 回答
0

您实际上不需要spring.cloud.stream.function.routing.enabled=true在文件中提供参数application.properties以使路由工作,因为它会在您提供routing-expression参数后自动工作 - 请参阅:spring cloud stream documentation

于 2021-07-04T21:36:23.680 回答
0

要启用路由,默认情况下会创建名为 functionRouter 的绑定。

根据文档:

RoutingFunction 注册在 FunctionCatalog 中,名称为 functionRouter。为了简单和一致,您还可以参考 RoutingFunction.FUNCTION_NAME 常量。

下面的配置应该可以正常工作:

spring:
  cloud:
    stream:
      function:
        definition: functionRouter;
        routing:
          enabled: true
      kafka:
        binder:
          brokers:
            - localhost:9092
      bindings:
        functionRouter-in-0:
          destination: my.topic
          group: my.topic.group
    function:
      routing-expression: "headers['type'] == 'even' ? 'evenConsumer' : 'oddConsumer'"

您不需要创建偶数和奇数消费者函数定义。

于 2021-04-29T11:15:48.450 回答