5

我想序列化 boost::signals2 信号的多线程调用,以确保有关来自对象的状态更改的通知以明确定义的顺序到达插槽。

背景

我在多线程程序中有一个具有内部状态的对象。内部状态的某些部分对程序的其他部分来说是有趣的,并且对象通过使用 boost::signals2 信号公开状态更改,类似于:

class ObjectWithState {
public:
    enum State {
        STATE_A,
        STATE_B,
        STATE_C,
    };

    void OnEvent() {
        State newState;
        {
            boost::lock_guard<boost::mutex> lock(m_Mutex);
            // Process event and change state
            m_State = ...;
            newState = m_State;
        }
        m_OnStateChanged(newState);
    }

    // method to allow external objects to connect to the signal etc
private:
    boost::signals2::signal<void (State) > m_OnStateChanged;
    boost::mutex m_Mutex;
    State m_State;
};

问题

如果有多个 OnEvent 处理程序的并发调用,这可能会导致侦听器以不同于实际发生更改的顺序收到有关状态更改的通知。状态本身由上面的互斥锁保护,因此实际状态是明确定义的。但是,互斥锁不能在对信号的调用中保持,因为这可能导致死锁。这意味着信号的实际调用可能以任何顺序发生,而我会要求它们以与实际发生状态更改的顺序相同的顺序被调用。

处理此问题的一种方法是从信号中删除状态,并仅通知侦听器状态已更改。然后,他们可以查询对象的状态,并获取对象在信号触发时的状态或以后的状态。在我的场景中,需要通知侦听器所有状态更改,因此此方法在这里不起作用。

我的下一个方法如下:

class ObjectWithState {
public:
    enum State {
        STATE_A,
        STATE_B,
        STATE_C,
    };

    void OnEvent() {
        State newState;
        boost::unique_future<void> waitForPrevious;
        boost::shared_ptr<boost::promise<void> > releaseNext;
        {
            boost::lock_guard<boost::mutex> lock(m_Mutex);
            // Process event and change state
            m_State = ...;
            newState = m_State;
            waitForPrevious = m_CurrentInvocation->get_future();
            m_CurrentInvocation.reset(new boost::promise<void>());
            releaseNext = m_CurrentInvocation;
        }
        // Wait for all previous invocations of the signal to finish
        waitForPrevious.get();

        // Now it is our turn to invoke the signal
        // TODO: use try-catch / scoped object to release next if an exception is thrown
        OnStateChanged(newState);

        // Allow the next state change to use the signal
        releaseNext->set_value();
    }

    // method to allow external objects to connect to the signal etc
private:
    boost::signals2::signal<void (State) > m_OnStateChanged;
    boost::mutex m_Mutex;
    State m_State;
    // Initialized with a "fulfilled" promise in the constructor
    // or do special handling of initially empty promise above
    boost::shared_ptr<boost::promise<void> > m_CurrentInvocation;
};

我没有尝试过上面的代码,所以它可能充满了错误和编译错误,但应该可以推断出我所追求的。我的直觉告诉我,我不是第一个遇到此类问题的人,我更喜欢使用经过验证的代码而不是我自己的... :) 所以我的问题是:

是否有一种预先存在的方法来实现对 boost::signals2 信号的序列化调用(例如内置到 signals2 库或通用模式中)?

4

1 回答 1

0

我提出以下解决方案。创建一个待处理信号队列并让一个单独的线程调度它们。代码大致如下所示:

class ObjectWithState {
private:
    bool running;
    std::queue<State> pendingSignals;
    boost::condition_variable cond;
    boost::mutex mut;

    void dispatcherThread()
    {
        while (running)
        {
            /* local copy, so we don't need to hold a lock */
            std::vector<State> pendingSignalsCopy;

            /* wait for new signals, then copy them locally */
            {
                boost::unique_lock<boost::mutex> lock(mut);
                cond.wait(mut);
                pendingSignalsCopy = pendingSignals;
                pendingSignals.clear();
            }

            /* dispatch */
            while (!pendingSignalsCopy.empty())
            {
                State newState = pendingSignalsCopy.front();
                OnStateChanged(newState);
                pendingSignalsCopy.pop();
            }
        }
    }

public:
    void OnEvent()
    {
        State newState;
        ...

        /* add signal to queue of pending signals and wake up dispatcher thread */
        {
            boost::unique_lock<boost::mutex> lock(mut);
            pendingSignals.push(state);
            cond.notify_all();
        }
    }
};
于 2012-04-04T09:22:53.863 回答