RSocketResumableTransport 似乎不起作用。我有以下代码,我正在使用 RSocketResumableTransport 但它从不连接到服务器(spring-boot 版本 2.4.0-M1)。我正在使用 rsocket-websocket-client": "0.0.19" 版本。下面是用于连接的类:
import { Injectable } from '@angular/core';
import { IdentitySerializer, JsonSerializer, RSocket, RSocketClient, RSocketResumableTransport } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
@Injectable({ providedIn: 'root' })
export class RSocketResumableConnector {
private client: RSocketClient;
private rsocket: RSocket;
constructor() {
console.log('RSocketResumableConnector loaded...');
this.createConnection();
}
public createConnection() {
const resumeToken = Buffer.from('tweet.all');
const bufferSize = 128;
const sessionDurationSeconds = 20;
const resumableTransport = new RSocketResumableTransport(
() =>
new RSocketWebSocketClient({
url: 'ws://localhost:7000/tweetsocket',
}),
{
bufferSize, // max number of sent & pending frames to buffer before failing
resumeToken, // unique identifier the session across connections
sessionDurationSeconds,
}
);
// Create an instance of a client
this.client = new RSocketClient({
// send/receive objects instead of strings/buffers
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer,
},
setup: {
payload: {
data: 'angular-client',
metadata: String.fromCharCode('shell-client'.length) + 'shell-client',
},
dataMimeType: 'application/json',
metadataMimeType: 'message/x.rsocket.routing.v0',
keepAlive: 5000,
lifetime: 60000,
},
transport: resumableTransport,
});
// start resumableTransport
let start = true;
resumableTransport.connectionStatus().subscribe({
onNext: (status) => {
console.log('Resumable transport status changed: ' + status.kind);
if (status.kind === 'NOT_CONNECTED') {
if (!start) {
console.log('Resumable transport disconnected, retrying...');
setTimeout(() => resumableTransport.connect());
} else {
start = false;
}
}
},
onSubscribe: (resumeSubscription) => {
resumeSubscription.request(Number.MAX_SAFE_INTEGER);
},
});
setTimeout(() => {}, 30000000);
// now create a socket
this.client.connect().subscribe({
onComplete: (socket: RSocket) => {
console.log('socket in created', socket);
this.rsocket = socket;
},
onError: (error) => console.error('Connection has been refused due to', error),
onSubscribe: (cancel) => {
console.log('client subscription done..');
},
onNext: (data) => {
console.log('data is', data);
},
});
}
public disconnectClient() {
if (this.client !== undefined) {
this.client.close();
}
}
public getRSocket(): RSocket {
return this.rsocket;
}
}
浏览器控制台中的连续错误如下。此错误不断重复,直到超时:
Resumable transport disconnected, retrying...
Resumable transport status changed: CONNECTING
Resumable transport status changed: NOT_CONNECTED
Resumable transport disconnected, retrying...
如果我做错了什么,请告诉我?