2

在我的聊天应用程序中使用 redux-saga 生成器时,我无法订阅 socketcluster ( http://socketcluster.io/ ) 频道。socketcluster 后端的设置方式是,任何消息都保存在数据库中,然后发布到接收用户的个人频道,该频道以用户的 id 命名。例如,用户 A 的 id 为“123abc”,并会订阅名为“123abc”的频道以获取他们的实时消息。

下面的代码确实接收到发布到通道的新消息,但它在加载时抛出“TypeError:将循环结构转换为 JSON”,并破坏了我在应用程序中的所有其他 redux-saga 生成器。我已经完成了 Chrome Devtools 的挖掘,我的理论是它与 createChannel 函数中创建的队列有关。此外,我尝试在 subscribeToChannel 函数中返回延迟承诺,但这也导致了循环转换错误,我可以根据要求发布该代码。

我首先提到了这个答案:https ://stackoverflow.com/a/35288877/5068616它帮助我获得了以下代码,但我在互联网上找不到任何类似的问题。还有一点需要注意的是,我正在使用 redux-socket-cluster ( https://github.com/mattkrick/redux-socket-cluster ) 来同步套接字和状态,但我不认为它是问题

sagas.js

export default function* root() {
    yield [
        fork(startSubscription),
    ]
}


function* startSubscription(getState) {
    while (true) {
        const {
            userId
        } = yield take(actions.SUBSCRIBE_TO_MY_CHANNEL);
        yield call(monitorChangeEvents, subscribeToChannel(userId))
    }
}

function* monitorChangeEvents(channel) {
    while (true) {
        const info = yield call(channel.take) // Blocks until the promise resolves
        console.log(info)
    }
}

function subscribeToChannel(channelName) {
    const channel = createChannel();
    const socket = socketCluster.connect(socketConfig);
    const c = socket.subscribe(channelName);
    c.watch(event => {
        channel.put(event)
    })

    return channel;
}

function createChannel() {
    const messageQueue = []
    const resolveQueue = []

    function put(msg) {
        // anyone waiting for a message ?
        if (resolveQueue.length) {
            // deliver the message to the oldest one waiting (First In First Out)
            const nextResolve = resolveQueue.shift()
            nextResolve(msg)
        } else {
            // no one is waiting ? queue the event
            messageQueue.push(msg)
        }
    }

    // returns a Promise resolved with the next message
    function take() {
        // do we have queued messages ?
        if (messageQueue.length) {
            // deliver the oldest queued message
            return Promise.resolve(messageQueue.shift())
        } else {
            // no queued messages ? queue the taker until a message arrives
            return new Promise((resolve) => resolveQueue.push(resolve))
        }
    }

    return {
        take,
        put
    }
}

谢谢您的帮助!

4

0 回答 0