我打算将RSocket用于我的通知系统。我想将Spring Boot RSocket 用于我的后端(Java),而对于我的前端,我将使用 Angular 使用 rsocket-js。
我能够快速启动一个请求流交互模型,其中我可以在我的系统中提取所有通知。请参阅我的后端的代码片段:
@MessageMapping("streams")
public Flux<Notification> requestStream() {
log.info("Streaming to notifications...");
return streamEventService.retrieveAllNotifications().log();
}
现在在我的前端,我有以下代码片段:
export class RsocketClientService {
// backend ws endpoint
private readonly wsURL = 'ws://localhost:7000/notification';
client: any;
socket: any
constructor() {
this.client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
keepAlive: 10000,
lifetime: 180000,
dataMimeType: 'application/json',
metadataMimeType: 'message/x.rsocket.routing.v0',
payload: {
data: 23
}
},
transport: new RSocketWebSocketClient({
url: this.wsURL
}),
responder: new EchoResponder()
});
}
public connect() {
console.log("initializeSocket...")
this.client.connect().subscribe({
onComplete: (socket: any) => {
this.socket = socket;
this.socket.connectionStatus().subscribe( (status: any) => {
console.log("Connection status? ", status);
});
},
onError: (error: any) => {
console.error("Connection onError? " + error);
},
onSubscribe: (cancel: any) => {
console.log("Connection onSubscribe? cancel?");
}
});
}
public retrieveNotifications() {
this.socket.requestStream({
data: null,
metadata: String.fromCharCode('streams'.length) + 'streams'
})
.subscribe({
onComplete: () => {
console.log("onComplete?");
},
onError: (error: any) => {
console.error("onError? error: " + error);
},
onNext: (payload: any) => {
console.log("onNext? payload: ", payload);
},
onSubscribe: (subscription: any) => {
console.log("onSubscribe?");
subscription.request(1000000);
},
});
}
我在 UI 中有一个按钮,如果单击该按钮,将调用方法retrieveNotifications ,该方法将订阅我的后端requestStream中的 rsocket 消息映射方法。
一切正常,我可以看到我的回复。现在我的问题是,如果在我的服务器上有一个新数据插入到数据库中,例如,我如何从我的后端服务器发送通知消息到前端说“嘿!新数据已推送到数据库中。” 我有点卡在服务器如何能够以某种方式使用火灾并忘记客户端。