我已经开始涉足 Spring Cloud Stream 和它的云功能支持。
我在这里上传了一个示例项目来阐明这个问题-> https://github.com/nmarquesantos/spring-cloud-function-kafka
我有一个项目以反应方式公开一些功能,使用 kafka 作为消息代理。
该函数接收一个 Flux 并继续通过 Reactive Mongo 库保存元素。然后它通过另一个通量返回更新的资源。
@Service
public class ExampleCloudFunction {
@Autowired
private PlayerRepository playerRepository;
@Bean
public Function<Flux<Player>, Flux<Player>> playerUpdate() {
return flux -> flux.flatMap(player -> playerRepository.save(player)).log("Saved player");
}
@PollableBean
public Supplier<Flux<Player>> playerFeeder() {
return () -> Flux.just(new Player(UUID.randomUUID().toString(), "Ronaldo"));
}
}
playerUpdate 函数是错误发生的地方。playerFeeder 只是我创建的一个函数,用于发送数据以重现问题。在现实生活中,这将来自不同的服务。
通过运行我上面提到的示例项目,这是错误的片段:
2020-05-21 22:13:40.842 ERROR 1884 --- [container-0-C-1] onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@2c3e726}
org.springframework.transaction.reactive.TransactionContextManager$NoTransactionInContextException: No transaction in context
2020-05-21 22:13:41.853 ERROR 1884 --- [container-0-C-1] onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@2c3e726}
org.springframework.transaction.reactive.TransactionContextManager$NoTransactionInContextException: No transaction in context
我很难理解我做错了什么,在我的搜索中找不到太多信息。