2

我已经在 Kotlin 中使用 kafka-streams 成功设置了 spring-cloud-function 并拥有一个工作原型。

我有一个函数可以接收来自一个主题的事件并为另一个主题生成另一个事件。

这是我的application.yml

spring:
  cloud:
    stream:
      bindings:
        consumeAndProduce-in-0:
          destination: topicToConsumeFrom
        consumeAndProduce-out-0:
          destination: topicToProduceTo
      kafka:
        binder:
          brokers: ${kafka.broker.prod}
      default:
        group: ${spring.application.name}
    function:
      definition: consumeAndProduce

我的 kotlin 应用程序如下所示:

@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            runApplication<Application>(*args)
        }
    }

    @Bean
    fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent {
        return { message ->
            doSomethingAndReturnProducerEvent(message)
        }
    }
}

我在互联网上关注了许多例子,它就像一个魅力。只要将消息放入topicToConsumeFrom,就会调用该函数并将结果写入topicToProduceTo.


现在我的问题是: 如果我的函数并不总是产生某些东西,那么正确的处理方式是什么。服务侦听主题并忽略消息是一个非常常见的用例。它应该只对特定消息做出反应,然后才产生输出,否则什么也不做。我通过创建一个不同的生产者函数来尝试这一点,只有在适用的情况下我才会从消费者那里调用它:

这是我的改编application.yml,现在定义了两个不同的功能consumeproduce

spring:
  cloud:
    stream:
      bindings:
        consume-in-0:
          destination: topicToConsumeFrom
        produce-out-0:
          destination: topicToProduceTo
      kafka:
        binder:
          brokers: ${kafka.broker.prod}
      default:
        group: ${spring.application.name}
    function:
      definition: consume;produce

在我的应用程序中,我重命名consumeAndProduce()consume(),返回值为 now Unit。此外,我创建了第二个 bean produce(),它只返回一个函数,该函数返回我提供给它的有效负载:

@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            runApplication<Application>(*args)
        }
    }

    @Bean
    fun consume(produce: (ProducerEvent) -> ProducerEvent): (ConsumerEvent) -> Unit {
        return { message ->
            if (isSomethingIWantToReactTo(message)) {
                val result = doSomethingAndReturnProduceEvent(message)
                producer(result)
            }
        }
    }

    @Bean
    fun produce(): (ProducerEvent) -> ProducerEvent {
        return { it }
    }
}

现在,consume()如果主题中存在消息并且消息是给我的,则调用 -function,我会做一些事情并produce()用我的结果调用 -function,否则什么也不做。

我无法在传出主题中看到任何消息。我通过调试知道它进入了 if-branch 并produce()使用我的结果调用,但似乎 kafka 绑定不起作用,并且没有真正发送任何消息。

我知道,我的方法有点幼稚,但经过广泛的研究,我找不到任何描述这个用例的东西。所有的例子总是有一个在所有情况下消费和产生的函数。

有正确的方法吗?

4

1 回答 1

2

基于第一个代码示例,有两种可能的解决方案,仅使用一个函数consumeAndProduce()进行消费和生产:

  1. 返回一个可为空的事件。如果函数返回null,则不会产生任何事件:
@Bean
fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent? {
    return { message ->
        if(isSomethingIWantToReactTo(message)) {
            doSomethingAndReturnProducerEvent(message)
        }
        null
    }
}
  1. 使用KStreams 消费事件流并产生事件流。这样,任何不适用的传入事件都可以简单地被过滤:
@Bean
fun consumeAndProduce(): (KStream<ConsumerEvent>) -> KStream<ProducerEvent> {
    return { messages ->
        messages
            .filter(message -> isSomethingIWantToReactTo(message))
            .map(message -> doSomethingAndReturnProducerEvent(message))
    }
}

注意:我没有遵循这种方法,也没有尝试过,因此建议对 KStreams 进行研究。

于 2020-07-31T09:48:24.427 回答