Github 上提供的示例项目:https ://github.com/codependent/rsocket-rating-service
Spring Boot RSocket 服务器消息映射需要一个 requestResponse 请求,返回一个简单的 POJO:
@MessageMapping("request-rating")
fun getRatingWebSocket(ratingRequest: RatingRequest): Mono<Rating> {
return Mono.just(Rating(ratingRequest.songId, (0..10).random())).log()
.doOnNext {
logger.info("Next {}", it)
}
.doOnCancel {
logger.info("Cancel")
}
.doOnSuccess {
logger.info("Success {}", it)
}
.doOnError { throwable ->
logger.error("Error {}", throwable)
}
.doOnTerminate { logger.info("Terminate") }
}
在客户端,我有以下 JS 代码连接到 RSocket 服务器并请求一个值:
const {
RSocketClient,
JsonSerializer,
IdentitySerializer,
} = require('rsocket-core');
const RSocketWebSocketClient = require('rsocket-websocket-client').default;
const route = 'request-rating';
let client = undefined;
let rSocket = undefined;
function main() {
if (client !== undefined) {
client.close();
}
client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// format of `data`
dataMimeType: 'application/json',
// format of `metadata`
metadataMimeType: 'message/x.rsocket.routing.v0',
},
transport: new RSocketWebSocketClient({
url: 'ws://localhost:8080/rating-ws'
}),
});
// Open the connection
client.connect().subscribe({
onComplete: socket => {
// socket provides the rsocket interactions fire/forget, request/response,
// request/stream, etc as well as methods to close the socket.
rSocket = socket;
},
onError: error => {
console.log("Connection has been refused due to ", error);
},
onSubscribe: cancel => {
/* call cancel() to abort */
}
});
document.getElementById('sendButton').addEventListener('click', requestRating);
}
function requestRating() {
rSocket.requestResponse({
data: {
'songId': document.getElementById("songId").value
},
metadata: String.fromCharCode(route.length) + route
}).subscribe({
onComplete: () => {
console.log('Complete')
},
onError: error => {
console.log("Connection has been closed due to " + error);
},
onNext: payload => {
console.log(payload.data);
},
onSubscribe: subscription => {
//subscription.request(1)
console.log("Subscribed")
}
});
}
document.addEventListener('DOMContentLoaded', main);
索引.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Rating Service</title>
<link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
<script src="/webjars/jquery/jquery.min.js"></script>
<script src="/webjars/stomp-websocket/stomp.min.js"></script>
<script src="bundle.js"></script>
</head>
<body>
<div>Request your rating</div>
<div>
<label> ClientId:
<input id="songId" type="text" name="songId"/>
</label>
</div>
<div><input id="sendButton" type="button" name="send" value="Send"/></div>
</body>
</html>
访问http://localhost:8080/index.html后输入一些文本并推送发送。
请求到达记录正确生成 onNext 值的服务器:
[ctor-http-nio-6] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ctor-http-nio-6] reactor.Mono.Just.1 : | request(1)
[ctor-http-nio-6] reactor.Mono.Just.1 : | onNext(Rating(songId=asdfas, value=0))
[ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController : Next Rating(songId=asdfas, value=0)
[ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController : Success Rating(songId=asdfas, value=0)
[ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController : Terminate
[ctor-http-nio-6] reactor.Mono.Just.1 : | onComplete()
但是在客户端
onNext: payload => {
console.log(payload.data);
},
永远不会被调用,浏览器日志只显示:
Subscribed
Complete
为什么它没有从服务器获取生成的值?