0

我已经开始涉足 Spring Cloud Stream 和它的云功能支持。

我在这里上传了一个示例项目来阐明这个问题-> https://github.com/nmarquesantos/spring-cloud-function-kafka

我有一个项目以反应方式公开一些功能,使用 kafka 作为消息代理。

该函数接收一个 Flux 并继续通过 Reactive Mongo 库保存元素。然后它通过另一个通量返回更新的资源。

@Service
public class ExampleCloudFunction {

@Autowired
private PlayerRepository playerRepository;

@Bean
public Function<Flux<Player>, Flux<Player>> playerUpdate() {
    return flux -> flux.flatMap(player -> playerRepository.save(player)).log("Saved player");
}

@PollableBean
public Supplier<Flux<Player>> playerFeeder() {
   return () -> Flux.just(new Player(UUID.randomUUID().toString(), "Ronaldo"));
}
}

playerUpdate 函数是错误发生的地方。playerFeeder 只是我创建的一个函数,用于发送数据以重现问题。在现实生活中,这将来自不同的服务。

通过运行我上面提到的示例项目,这是错误的片段:

2020-05-21 22:13:40.842 ERROR 1884 --- [container-0-C-1] onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@2c3e726}

org.springframework.transaction.reactive.TransactionContextManager$NoTransactionInContextException: No transaction in context

2020-05-21 22:13:41.853 ERROR 1884 --- [container-0-C-1] onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@2c3e726}

org.springframework.transaction.reactive.TransactionContextManager$NoTransactionInContextException: No transaction in context

我很难理解我做错了什么,在我的搜索中找不到太多信息。

4

1 回答 1

0

Spring for Apache Kafka 目前不支持反应式事务。

于 2020-05-21T14:36:23.077 回答