我正在尝试使用 rsocket-websocket-client 构建一个聊天前端。我可以使用从前端发送消息requestChannel(new Flowable(source...))
并使用requestChannel(new Flowable.just({metatdata}))
.
我试图使用FlowableProcessor
将两个调用减少requestChannel
为一个。
找不到有关FlowableProcessor
rsocket 的文档。
这是我的尝试:
const processor = new FlowableProcessor(
new Flowable(source => {
source.onSubscribe({
cancel: () => {},
request: n => {}
});
source.onNext({
metadata: constructMetadataWithChannelId(channelId),
});
})
);
sock.requestChannel(processor.map(item => item))
.subscribe({
onComplete: () => {
console.log(
`complted subscribe`,
);
},
onError: error1 => {
console.log(
`subscriber err: ${error1}`,
);
},
onSubscribe: subscription => {
console.log(
`onSubscribe`,
);
setConnectStatus('connected');
setChannelIdDone(true);
subscription.request(1000);
},
onNext: (val: any) => {
const value = JSON.parse(val) as Message;
console.log(
`received event from channel: ${JSON.stringify(
value,
)}`,
);
}
})
我明白这是类型问题。无法弄清楚哪里processor.map(item => item)
出错了。
TS2345: Argument of type 'IPublisher<unknown>' is not assignable to parameter of type 'Flowable<Payload<Buffer, Buffer>>'.
Type 'IPublisher<unknown>' is missing the following properties from type 'Flowable<Payload<Buffer, Buffer>>': lift, take