我正在使用 Spring Boot 2.6.3。这是一个 Spring webflux 项目,使用 RSocket over Websooket 进行通知。
服务器端:-
构建.gradle,
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-security'
implementation 'org.springframework.security:spring-security-messaging'
implementation 'org.springframework.security:spring-security-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
控制器,
@Controller
public class RSocketController {
private static final List<RSocketRequester> CLIENTS = new ArrayList();
@ConnectMapping
void onConnect(RSocketRequester rSocketRequester) {
rSocketRequester.rsocket().onClose().doFirst(() -> {
CLIENTS.add(rSocketRequester);
}).doOnError(error -> {
}).doFinally(consumer -> {
}
}).subscribe();
return Mono.empty();
}).subscribe();
rSocketRequester.route("client-status").data("Success!").send().subscribe();
}
@MessageMapping("send-notification")
public void sendNotification(Mono<String> notification) {
notification.subscribe(message -> {
LOGGER.info("" + message);
}, e -> LOGGER.error(e.getMessage(), e), () -> LOGGER.info("completed without a value."));
}
@GetMapping("/notify")
public void giveCallback() {
CLIENTS.get(0).route("callback-notification").data("CALLBACK TEST").send().subscribe();
}
}
客户端:-
class ClientHandler {
@MessageMapping("client-status")
public void statusUpdate(String status) {
System.out.println("Called from server ========== -> Connection {}" + status);
}
@MessageMapping("callback-notification")
public void requestResponse(String callback) {
System.out.println("Received callback from server, request: " + callback);
}
}
@Controller
public class ClientController {
public ClientController(@Autowired RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
String client = UUID.randomUUID().toString();
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
this.rSocketRequester = rsocketRequesterBuilder
.rsocketStrategies(strategies).rsocketConnector(connector -> {
connector.acceptor(responder);
})
.websocket(URI.create("ws://localhost:9091/rsocket/"));
this.rSocketRequester.route("send-notification").data("Test Send").send().subscribe();
}
}
在这里,来自客户,
- 客户端能够连接到服务器
- “连接映射”方法在服务器端的新连接上被调用,它将状态消息发送到客户端,我能够在客户端看到该消息。
- 服务器端的“回调通知”方法能够向客户端发送通知,我能够在客户端看到消息。
但是,在客户端调用 send-notification 并不是在服务器端执行 sendNotification 方法。
this.rSocketRequester.route("send-notification").data("TEST Server Notification").send().subscribe();
在这里,客户端响应者正在工作。但是,服务器响应程序不工作。服务器响应程序是否需要任何特殊配置。