1

我的问题:
我已经在后端(java spring boot)和前端(React.js)上实现了 RSockets。
我的前端和后端能够通信。但是,我想知道是否可以从我的后端向多个连接的客户端广播消息。
这可能吗?

4

1 回答 1

0

对的,这是可能的。

这是一个使用 spring webflux 和 kotlin 协程的 rsocket 端点的示例,它将事件多播给它的所有观察者。

这使用了 MutableSharedFlow,但这可以通过rxjava3中的 PublishSubject或Flowable来实现。

在此示例中,rsocket 服务器每 30 秒向所有连接的客户端广播一个时间戳。

控制器

@Controller
class ApiResource(
    private val chatManager: ChatManager
) {
    @MessageMapping("broadcast")
    suspend fun broadcast(): Flow<Message> =
        chatManager.broadcastStream()
}

流管理器以及演示发布者

@Service
class ChatManager {
    val broadcast = MutableSharedFlow<Message>()
    fun broadcastStream() = broadcast

    @Scheduled(fixedRate = 30 * 1000)
    fun sendBroadcast() = runBlocking{
        broadcast.emit(
            Message(
                "SpringServer",
                System.currentTimeMillis().toString(),
                "broadcast"
            )
        )
    }
}

反应组件BroadcastApp.tsx

import React, {useEffect, useState} from 'react';
import {
    BufferEncoders, encodeAndAddCustomMetadata, encodeAndAddWellKnownMetadata,
    MESSAGE_RSOCKET_COMPOSITE_METADATA, MESSAGE_RSOCKET_ROUTING,
    RSocketClient,
    toBuffer, createBuffer
} from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';

const metadataMimeType = MESSAGE_RSOCKET_COMPOSITE_METADATA.string // message/x.rsocket.composite-metadata.v0
const dataMimeType = 'application/json';
const websocketEndpoint = 'ws://localhost:5051/rsocket';
const transport = new RSocketWebSocketClient({
    url: websocketEndpoint,
}, BufferEncoders);

const endpoint = 'broadcast';

interface Message {
    senderId: string;
    text: string;
    channelId: string;
}

type CONNECT_STATUS = 'disconnected' | 'connecting' | 'connected';

function BroadcastApp() {
    const defaultMetadata = encodeAndAddWellKnownMetadata(
        createBuffer(0),
        MESSAGE_RSOCKET_ROUTING,
        toBuffer(String.fromCharCode(endpoint.length) + endpoint)
    )

    const constructMetadataWithChannelId = (cid: string) => encodeAndAddCustomMetadata(
        defaultMetadata,
        'messaging/x.chat.client-id',
        toBuffer(String.fromCharCode(cid.length) + cid)
    )

    const client = new RSocketClient({
        setup: {
            keepAlive: 60000,
            lifetime: 180000,
            dataMimeType,
            metadataMimeType,
        },
        transport,
    });

    const [connectStatus, setConnectStatus] = useState<CONNECT_STATUS>('disconnected');
    const [msgs, setMsgs] = useState<string[]>([]);

    useEffect(() => {
        if (connectStatus === 'connecting') {
            console.log(`rsocket client connecting...`);
            client
                .connect()
                .subscribe({
                    onError: error => {
                        console.log(`error: client connect: ${error}`)
                        setConnectStatus('disconnected');
                    },
                    onSubscribe: cancel => {},
                    onComplete: (sock) => {
                        sock.requestStream({
                            metadata: constructMetadataWithChannelId('broadcast')
                        }).subscribe({
                            onSubscribe: (subscription) => {
                                console.log(`rsocket client connected ✅`);
                                setConnectStatus('connected');
                                subscription.request(1000)
                            },
                            onNext: (event:any) => {
                                console.log(`received event from channel: ${JSON.stringify(event)}`);
                                const value = JSON.parse(event.data) as Message;
                                setMsgs(prev => [value.text, ...prev]);
                            },
                            onError: (error) => {
                                console.log(`err with rsocket subscribe: ${error}`)
                            }
                        });
                    }
                });
        }
    }, [connectStatus])
    const handleConnect = () => {
        setConnectStatus('connecting');
    };
    const handleDisconnect = () => {
        alert('todo: implement disconnect');
    }
    return (
        <div style={{ padding: 20}}>

            <div>
                {(connectStatus === 'connected') ? (
                    <div style={{margin: 10, fontSize: 18}}>
                        <button
                            onClick={handleDisconnect}
                            style={{
                                padding: 10,
                                borderRadius: 10,
                                fontSize: 28,
                                margin: 10,
                            }}
                        >
                            Disconnect
                        </button>
                    </div>
                ) : (<div>
                    {connectStatus === 'disconnected' ? (<button
                        onClick={handleConnect}
                        style={{
                            padding: 10,
                            borderRadius: 10,
                            fontSize: 28,
                        }}
                    >
                        Connect
                    </button>) : (
                        <></>
                    )}
                </div>)}

            </div>
            <div>
                {msgs.map(item => (
                    <div
                        style={{
                            backgroundColor: 'lightgreen',
                            fontSize: 18,
                            width: 300,
                            padding: 10,
                            borderRadius: 10,
                            margin: 10,
                        }}>
                        {item}
                    </div>
                ))}
            </div>
        </div>
    );
}

export default BroadcastApp;
于 2021-12-17T06:41:43.690 回答