3

RSocketResumableTransport 似乎不起作用。我有以下代码,我正在使用 RSocketResumableTransport 但它从不连接到服务器(spring-boot 版本 2.4.0-M1)。我正在使用 rsocket-websocket-client": "0.0.19" 版本。下面是用于连接的类:

import { Injectable } from '@angular/core';
import { IdentitySerializer, JsonSerializer, RSocket, RSocketClient, RSocketResumableTransport } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';

@Injectable({ providedIn: 'root' })
export class RSocketResumableConnector {
    private client: RSocketClient;
    private rsocket: RSocket;

    constructor() {
        console.log('RSocketResumableConnector loaded...');
        this.createConnection();
    }

    public createConnection() {
        const resumeToken = Buffer.from('tweet.all');
        const bufferSize = 128;
        const sessionDurationSeconds = 20;

        const resumableTransport = new RSocketResumableTransport(
            () =>
                new RSocketWebSocketClient({
                    url: 'ws://localhost:7000/tweetsocket',
                }),
            {
                bufferSize, // max number of sent & pending frames to buffer before failing
                resumeToken, // unique identifier the session across connections
                sessionDurationSeconds,
            }
        );

        // Create an instance of a client
        this.client = new RSocketClient({
            // send/receive objects instead of strings/buffers
            serializers: {
                data: JsonSerializer,
                metadata: IdentitySerializer,
            },
            setup: {
                payload: {
                    data: 'angular-client',
                    metadata: String.fromCharCode('shell-client'.length) + 'shell-client',
                },
                dataMimeType: 'application/json',
                metadataMimeType: 'message/x.rsocket.routing.v0',
                keepAlive: 5000,
                lifetime: 60000,
            },
            transport: resumableTransport,
        });

        // start resumableTransport
        let start = true;
        resumableTransport.connectionStatus().subscribe({
            onNext: (status) => {
                console.log('Resumable transport status changed: ' + status.kind);

                if (status.kind === 'NOT_CONNECTED') {
                    if (!start) {
                        console.log('Resumable transport disconnected, retrying...');
                        setTimeout(() => resumableTransport.connect());
                    } else {
                        start = false;
                    }
                }
            },
            onSubscribe: (resumeSubscription) => {
                resumeSubscription.request(Number.MAX_SAFE_INTEGER);
            },
        });

        setTimeout(() => {}, 30000000);

        // now create a socket
        this.client.connect().subscribe({
            onComplete: (socket: RSocket) => {
                console.log('socket in created', socket);
                this.rsocket = socket;
            },
            onError: (error) => console.error('Connection has been refused due to', error),
            onSubscribe: (cancel) => {
                console.log('client subscription done..');
            },
            onNext: (data) => {
                console.log('data is', data);
            },
        });
    }

    public disconnectClient() {
        if (this.client !== undefined) {
            this.client.close();
        }
    }

    public getRSocket(): RSocket {
        return this.rsocket;
    }
}

浏览器控制台中的连续错误如下。此错误不断重复,直到超时:

Resumable transport disconnected, retrying...
Resumable transport status changed: CONNECTING
Resumable transport status changed: NOT_CONNECTED
Resumable transport disconnected, retrying...

如果我做错了什么,请告诉我?

4

0 回答 0