0

我正在尝试使用 rsocket-websocket-client 构建一个聊天前端。我可以使用从前端发送消息requestChannel(new Flowable(source...))并使用requestChannel(new Flowable.just({metatdata})).

我试图使用FlowableProcessor将两个调用减少requestChannel为一个。

找不到有关FlowableProcessorrsocket 的文档。

这是我的尝试:

const processor = new FlowableProcessor(
    new Flowable(source => {
        source.onSubscribe({
            cancel: () => {},
            request: n => {}
        });
        source.onNext({
            metadata: constructMetadataWithChannelId(channelId),
        });
    })
);
sock.requestChannel(processor.map(item => item))
    .subscribe({
        onComplete: () => {
            console.log(
                `complted subscribe`,
            );
        },
        onError: error1 => {
            console.log(
                `subscriber err: ${error1}`,
            );
        },
        onSubscribe: subscription => {
            console.log(
                `onSubscribe`,
            );
            setConnectStatus('connected');
            setChannelIdDone(true);
            subscription.request(1000);
        },
        onNext: (val: any) => {
            const value = JSON.parse(val) as Message;
            console.log(
                `received event from channel: ${JSON.stringify(
                                            value,
                                        )}`,
            );
        }
    })

我明白这是类型问题。无法弄清楚哪里processor.map(item => item)出错了。

TS2345: Argument of type 'IPublisher<unknown>' is not assignable to parameter of type 'Flowable<Payload<Buffer, Buffer>>'.
Type 'IPublisher<unknown>' is missing the following properties from type 'Flowable<Payload<Buffer, Buffer>>': lift, take
4

1 回答 1

0

错误是微不足道的。FlawableProcessor不能使用,因为它没有实现与Flawable.

目前rsocket-js没有很好地打磨并且有一些缺陷。其中一些缺陷是类型使用不一致。AFAIU 它应该IPublisherISubscriber接口应该在所有其他公共接口中使用。但是为了作者的简单(我猜)它们被替换为FlowableandSingle类型。

根据源代码FlowableProcessor不扩展Flowable而是实现IPublisher,接口本身ISubscriberISubscription没有实现lifttake方法实现Flowable。所以它不能直接使用,Flowable虽然它应该被用作IPublisher.

在您的示例中,我认为没有理由使用FlowableProcessor. 相反,您可以将Flowableused 作为参数传递FlowableProcessorrequestChannel直接构造方法:

const requestSource = new Flowable(source => {
    source.onSubscribe({
        cancel: () => {},
        request: n => {}
    });
    source.onNext({
        metadata: constructMetadataWithChannelId(channelId),
    });
});
sock.requestChannel(requestSource.map(item => item))
    ...

如果您真的需要FlowableProcessor在这段代码中使用处理器,那么您可以强制将其强制转换为,Flowable但它可能会成为未来意外错误的来源:

sock.requestChannel(processor.map(item => item) as any as Flowable)

请同时注意您使用Flowable不正确。当尚未请求数据时,您在订阅时发送数据。这违反了 RSocket 合同。正确的实现应该是这样的:

    let requestsSink: {
        sendRequest(myRequest: unknown): void,
        complete(): void
    };
    const requestsSource = new Flowable((requestsSubscriber) => {
        // Number of the requests requested by subscriber.
        let requestedRequests = 0;
        // Buffer for requests which should be sent but not requested yet.
        const pendingRequests: unknown[] = [];
        let completed = false;

        requestsSink = {
            sendRequest(myRequest: unknown) {
                if (completed) {
                    // It's completed, nobody expects this request.
                    return;
                }
                if (requestedRequests > 0) {
                    --requestedRequests;
                    requestsSubscriber.onNext(myRequest);
                } else {
                    pendingRequests.push(myRequest);
                }
            },
            complete() {
                if (!completed) {
                    completed = true;
                    requestsSubscriber.onComplete();
                }
            },
        };

        requestsSubscriber.onSubscribe({
            cancel: () => {
                // TODO: Should be handled somehow.
            },
            request(n: number) {
                const toSend = pendingRequests.splice(n);
                requestedRequests += n - toSend.length;
                for (const pending of toSend) {
                    requestsSubscriber.onNext(pending);
                }
            }
        });
    });

    sock.requestChannel(requestsSource.map(item => item))
        ...
    
    // Somewhere else the data is provided:
    if (requestsSink != null) {
        requestsSink.sendRequest({});
        requestsSink.sendRequest({});
        requestsSink.sendRequest({});
        requestsSink.sendRequest({});
        requestsSink.complete();
    }

于 2021-12-30T13:59:29.977 回答