我的问题:
我已经在后端(java spring boot)和前端(React.js)上实现了 RSockets。
我的前端和后端能够通信。但是,我想知道是否可以从我的后端向多个连接的客户端广播消息。
这可能吗?
问问题
98 次
1 回答
0
对的,这是可能的。
这是一个使用 spring webflux 和 kotlin 协程的 rsocket 端点的示例,它将事件多播给它的所有观察者。
这使用了 MutableSharedFlow,但这可以通过rxjava3中的 PublishSubject或Flowable来实现。
在此示例中,rsocket 服务器每 30 秒向所有连接的客户端广播一个时间戳。
控制器:
@Controller
class ApiResource(
private val chatManager: ChatManager
) {
@MessageMapping("broadcast")
suspend fun broadcast(): Flow<Message> =
chatManager.broadcastStream()
}
流管理器以及演示发布者:
@Service
class ChatManager {
val broadcast = MutableSharedFlow<Message>()
fun broadcastStream() = broadcast
@Scheduled(fixedRate = 30 * 1000)
fun sendBroadcast() = runBlocking{
broadcast.emit(
Message(
"SpringServer",
System.currentTimeMillis().toString(),
"broadcast"
)
)
}
}
反应组件BroadcastApp.tsx
:
import React, {useEffect, useState} from 'react';
import {
BufferEncoders, encodeAndAddCustomMetadata, encodeAndAddWellKnownMetadata,
MESSAGE_RSOCKET_COMPOSITE_METADATA, MESSAGE_RSOCKET_ROUTING,
RSocketClient,
toBuffer, createBuffer
} from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
const metadataMimeType = MESSAGE_RSOCKET_COMPOSITE_METADATA.string // message/x.rsocket.composite-metadata.v0
const dataMimeType = 'application/json';
const websocketEndpoint = 'ws://localhost:5051/rsocket';
const transport = new RSocketWebSocketClient({
url: websocketEndpoint,
}, BufferEncoders);
const endpoint = 'broadcast';
interface Message {
senderId: string;
text: string;
channelId: string;
}
type CONNECT_STATUS = 'disconnected' | 'connecting' | 'connected';
function BroadcastApp() {
const defaultMetadata = encodeAndAddWellKnownMetadata(
createBuffer(0),
MESSAGE_RSOCKET_ROUTING,
toBuffer(String.fromCharCode(endpoint.length) + endpoint)
)
const constructMetadataWithChannelId = (cid: string) => encodeAndAddCustomMetadata(
defaultMetadata,
'messaging/x.chat.client-id',
toBuffer(String.fromCharCode(cid.length) + cid)
)
const client = new RSocketClient({
setup: {
keepAlive: 60000,
lifetime: 180000,
dataMimeType,
metadataMimeType,
},
transport,
});
const [connectStatus, setConnectStatus] = useState<CONNECT_STATUS>('disconnected');
const [msgs, setMsgs] = useState<string[]>([]);
useEffect(() => {
if (connectStatus === 'connecting') {
console.log(`rsocket client connecting...`);
client
.connect()
.subscribe({
onError: error => {
console.log(`error: client connect: ${error}`)
setConnectStatus('disconnected');
},
onSubscribe: cancel => {},
onComplete: (sock) => {
sock.requestStream({
metadata: constructMetadataWithChannelId('broadcast')
}).subscribe({
onSubscribe: (subscription) => {
console.log(`rsocket client connected ✅`);
setConnectStatus('connected');
subscription.request(1000)
},
onNext: (event:any) => {
console.log(`received event from channel: ${JSON.stringify(event)}`);
const value = JSON.parse(event.data) as Message;
setMsgs(prev => [value.text, ...prev]);
},
onError: (error) => {
console.log(`err with rsocket subscribe: ${error}`)
}
});
}
});
}
}, [connectStatus])
const handleConnect = () => {
setConnectStatus('connecting');
};
const handleDisconnect = () => {
alert('todo: implement disconnect');
}
return (
<div style={{ padding: 20}}>
<div>
{(connectStatus === 'connected') ? (
<div style={{margin: 10, fontSize: 18}}>
<button
onClick={handleDisconnect}
style={{
padding: 10,
borderRadius: 10,
fontSize: 28,
margin: 10,
}}
>
Disconnect
</button>
</div>
) : (<div>
{connectStatus === 'disconnected' ? (<button
onClick={handleConnect}
style={{
padding: 10,
borderRadius: 10,
fontSize: 28,
}}
>
Connect
</button>) : (
<></>
)}
</div>)}
</div>
<div>
{msgs.map(item => (
<div
style={{
backgroundColor: 'lightgreen',
fontSize: 18,
width: 300,
padding: 10,
borderRadius: 10,
margin: 10,
}}>
{item}
</div>
))}
</div>
</div>
);
}
export default BroadcastApp;
于 2021-12-17T06:41:43.690 回答