分辨率总结:
在目前的大多数 RSocket 示例中,即使在 SpringBoot 相关教程中,服务器端接受器也只是简单地构造为一个新对象(如下面的 new MqttMessageService())。如果您在接受器类中生成示例内容,这很好,但当接受器依赖容器中的其他 bean 时,可能会导致以下依赖注入相关的混淆。
原始问题:
尝试通过 Rsocket 的 Java 服务器使用 Spring Data Reactive Mongodb 存储库流式传输数据库条目时出现 NullPointerException。
问题在于,在调试期间,所有组件都单独工作:我可以通过同一个 Mongodb 存储库获取请求的数据,我还可以使用 Rsocket 在同一服务器和客户端之间流式传输随机生成的数据。
所以我要么错过了一些非常基本的东西,要么一起使用 Reactive Mongodb 和 Rsocket 可能会出现问题。
这是原始的服务器端 Rsocket 配置:
@Configuration
public class RsocketConfig {
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
这是具有正确 DI的工作服务器端 Rsocket 配置:
@Configuration
public class RsocketConfig {
@Autowired
MqttMessageService messageService;
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(messageService))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
这是服务器端 AbstractRSocket 实现,其中在返回 service.findAll() 时抛出 NullPointerException。
@Service
public class MqttMessageService extends AbstractRSocket {
@Autowired
private MqttMessageEntityService service;
@Override
public Flux<Payload> requestStream(Payload payload) {
return service.findAll()
.map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));
}
}
这是反应式存储库和相关服务。该服务在注入到服务器的 AbstractRSocket 实现时返回 null,但在注入其他类时工作正常:
@Service
public class MqttMessageEntityService {
@Autowired
private MqttMessageEntityRepository repository;
public Flux<MqttMessageEntity> findAll() {
return repository.findAll();
}
}
public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {
}
这是与测试内容完美配合的客户端代码:
@Configuration
public class RsocketConfig {
@PostConstruct
public void testRsocket() {
RSocket rSocketClient = RSocketFactory
.connect()
.transport(TcpClientTransport.create(8802))
.start()
.block();
rSocketClient
.requestStream(DefaultPayload.create(""))
.blockLast();
}
}
我在这里可能有点超出我的知识水平,并且关于该主题的资源非常有限,所以我很感激任何关于解决方案的提示:)