我已经在 Kotlin 中使用 kafka-streams 成功设置了 spring-cloud-function 并拥有一个工作原型。
我有一个函数可以接收来自一个主题的事件并为另一个主题生成另一个事件。
这是我的application.yml
spring:
cloud:
stream:
bindings:
consumeAndProduce-in-0:
destination: topicToConsumeFrom
consumeAndProduce-out-0:
destination: topicToProduceTo
kafka:
binder:
brokers: ${kafka.broker.prod}
default:
group: ${spring.application.name}
function:
definition: consumeAndProduce
我的 kotlin 应用程序如下所示:
@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
}
@Bean
fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent {
return { message ->
doSomethingAndReturnProducerEvent(message)
}
}
}
我在互联网上关注了许多例子,它就像一个魅力。只要将消息放入topicToConsumeFrom
,就会调用该函数并将结果写入topicToProduceTo
.
现在我的问题是: 如果我的函数并不总是产生某些东西,那么正确的处理方式是什么。服务侦听主题并忽略消息是一个非常常见的用例。它应该只对特定消息做出反应,然后才产生输出,否则什么也不做。我通过创建一个不同的生产者函数来尝试这一点,只有在适用的情况下我才会从消费者那里调用它:
这是我的改编application.yml
,现在定义了两个不同的功能consume
和produce
:
spring:
cloud:
stream:
bindings:
consume-in-0:
destination: topicToConsumeFrom
produce-out-0:
destination: topicToProduceTo
kafka:
binder:
brokers: ${kafka.broker.prod}
default:
group: ${spring.application.name}
function:
definition: consume;produce
在我的应用程序中,我重命名consumeAndProduce()
为consume()
,返回值为 now Unit
。此外,我创建了第二个 bean produce()
,它只返回一个函数,该函数返回我提供给它的有效负载:
@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
}
@Bean
fun consume(produce: (ProducerEvent) -> ProducerEvent): (ConsumerEvent) -> Unit {
return { message ->
if (isSomethingIWantToReactTo(message)) {
val result = doSomethingAndReturnProduceEvent(message)
producer(result)
}
}
}
@Bean
fun produce(): (ProducerEvent) -> ProducerEvent {
return { it }
}
}
现在,consume()
如果主题中存在消息并且消息是给我的,则调用 -function,我会做一些事情并produce()
用我的结果调用 -function,否则什么也不做。
我无法在传出主题中看到任何消息。我通过调试知道它进入了 if-branch 并produce()
使用我的结果调用,但似乎 kafka 绑定不起作用,并且没有真正发送任何消息。
我知道,我的方法有点幼稚,但经过广泛的研究,我找不到任何描述这个用例的东西。所有的例子总是有一个在所有情况下消费和产生的函数。
有正确的方法吗?