24

在阅读了一些关于Qt Signal-Slot 通信的文章后,我仍然对排队连接有疑问。

如果我有一些线程一直在相互发送信号,假设一个线程thread_slow在它的事件循环中运行一个慢速方法,另一个thread_fast运行一个快速发送多个信号的方法,而另一个线程仍在运行它是慢速方法...... ..当slow方法从thread_slow返回到事件循环时,它会处理之前发送的所有信号thread_fast还是只处理最后一个(所有信号都是同一类型)?

如果它将处理所有信号,有没有办法让thread_slow 唯一的过程成为最后一个?(考虑到多线程应用程序中的“最后一个”可能是模糊的,为了简单起见,让我们考虑线程请求最后一个信号之前的最后一个信号,因此在线程寻找最后一个信号时发送的新信号可能会丢失)。

(我问这个是因为我有多个线程从多个线程接收数据,我不希望它们处理旧数据,只是发送的最后一个数据)

我已经运行了一些测试,看起来 Qt 将处理所有信号。我做了一个线程:

while(true)
{
    QThread::msleep(500);
    emit testQueue(test);
    test++;
}

另一个插槽可以:

void test::testQueue(int test)
{
    test.store(private_test.load() + test);
    emit testText(QString("Test Queue: ") + QString::number(private_test.load()));
}

线程将运行:

while(true)
{
    QThread::msleep(3000);
    QCoreApplication::processEvents();
    private_test.store(private_test.load() + 1000);
}

我每 500 毫秒从一个线程向另一个线程发送一个信号,另一个线程休眠 3000 毫秒(3 秒),然后唤醒并将内部变量增加 100。每次执行插槽时,它都会发出一个文本收到的值 + 内部变量。我得到的结果是每次 QCoreApplication::processEvents();被调用,所有信号都被执行......(我编辑了这部分,因为我在之前的代码中发现了一个错误)

4

8 回答 8

14

QCoreApplication QMetaCallEvent 压缩

每个排队的插槽调用最终都会将 a 发布QMetaCallEvent到目标对象。该事件包含发送者对象、信号 id、槽索引和打包的调用参数。在 Qt 5 上,信号 id 通常不等于返回的值QMetaObject::signalIndex():它是一个计算的索引,就好像对象只有信号方法而没有其他方法一样。

目标是压缩此类调用,以便事件队列中对于给定的(发送者对象、发送者信号、接收者对象、接收者槽)元组仅存在一个唯一调用。

这是唯一明智的方法,无需更改源或目标对象,同时保持最小的开销。我的其他答案中的事件循环递归方法每个事件都有严重的堆栈开销,当 Qt 为 64 位指针架构构建时,大约为 1kbyte。

当新事件发布到一个已经发布了一个或多个事件的对象时,可以访问事件队列。在这种情况下,QCoreApplication::postEvent调用QCoreApplication::compressEvent. compressEvent当第一个事件发布到对象时不调用。在此方法的重新实现中,QMetaCallEvent可以检查发布到目标对象的内容以查找对您的插槽的调用,并且必须删除过时的副本。必须包含私有 Qt 头文件才能获得 和的QMetaCallEvent定义。QPostEventQPostEventList

优点:发送者和接收者对象都不必知道任何事情。信号和槽按原样工作,包括 Qt 5 中的方法指针调用。Qt 本身使用这种压缩事件的方式。

缺点:需要包含私有 Qt 标头和强制清除QEvent::posted标志。

当零持续时间计时器被触发时,QEvent::posted可以在单独的列表中排队并在调用之外删除要删除的事件,而不是破解标志。compressEvent这具有额外的事件列表的开销,并且每个事件删除都会遍历发布的事件列表

其他方法

以其他方式执行此操作的目的是不使用 Qt 的内部结构。

L1第一个限制是不能访问私有定义的内容QMetaCallEvent。可按以下方式处理:

  1. 可以在源对象和目标对象之间连接具有与目标相同签名的信号和槽的代理对象。

  2. 在代理对象上运行QMetaCallEvent允许提取调用类型、被调用的槽 id 和参数。

  3. 代替信号槽连接,可以将事件显式发布到目标对象。目标对象或事件过滤器必须显式地从事件数据重新合成槽调用。

  4. 可以使用自定义compressedConnect实现来代替QObject::connect. 这充分暴露了信号和槽的细节。代理对象可用于queued_activate在发送者对象一侧执行压缩友好的等效操作。

L2第二个限制是不能完全重新实现QCoreApplication::compressEvent,因为事件列表是私有定义的。我们仍然可以访问被压缩的事件,我们仍然可以决定是否删除它,但是没有办法迭代事件列表。因此:

  1. 事件队列可以通过sendPostedEvents从内部递归调用来隐式访问notify(因此也可以从eventFilter()event()或从插槽)。这不会导致死锁,因为QCoreApplication::sendPostedEvents当事件通过sendEvent. 可以按如下方式过滤事件:

    • 在全球范围内重新实施QCoreApplication::notify
    • 通过在全球范围内注册一个QInternal::EventNotifyCallback,
    • 在本地通过将事件过滤器附加到对象,
    • QObject::event()通过在目标类中重新实现显式本地化。

    重复的事件仍会发布到事件队列中。notify从内部进行的递归调用会sendPostedEvents消耗相当多的堆栈空间(在 64 位指针架构上预算为 1kb)。

  2. QCoreApplication::removePostedEvents在将新事件发布到对象之前,可以通过调用来删除已经存在的事件。不幸的是,这样做QCoreApplication::compressEvent会导致死锁,因为事件队列互斥体已经被持有。

    包含指向接收器对象的指针的自定义事件类可以removePostedEvents在构造函数中自动调用。

  3. QEvent::Exit可以重新占用现有的压缩事件,例如。

    这些事件的集合是一个实现细节,可能会发生变化。QObject除了接收器指针之外,Qt 不会区分这些事件。实现需要每个(事件类型,接收器对象)元组的代理 QObject 开销。

执行

下面的代码适用于 Qt 4 和 Qt 5。在后者上,确保添加QT += core-private到您的 qmake 项目文件中,以便包含私有 Qt 标头。

其他答案中给出了不使用 Qt 内部标头的实现:

有两个事件删除代码路径,由 选择if (true)。启用的代码路径通常会保留最近的事件并且最有意义。或者,您可能希望保留最旧的事件 - 这就是禁用代码路径的作用。

截屏

#include <QApplication>
#include <QMap>
#include <QSet>
#include <QMetaMethod>
#include <QMetaObject>
#include <private/qcoreapplication_p.h>
#include <private/qthread_p.h>
#include <private/qobject_p.h>

#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

// Works on both Qt 4 and Qt 5.

//
// Common Code

/*! Keeps a list of singal indices for one or more meatobject classes.
 * The indices are signal indices as given by QMetaCallEvent.signalId.
 * On Qt 5, those do *not* match QMetaObject::methodIndex since they
 * exclude non-signal methods. */
class SignalList {
    Q_DISABLE_COPY(SignalList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
    /*! Returns a signal index that is can be compared to QMetaCallEvent.signalId. */
    static int signalIndex(const QMetaMethod & method) {
        Q_ASSERT(method.methodType() == QMetaMethod::Signal);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        int index = -1;
        const QMetaObject * mobj = method.enclosingMetaObject();
        for (int i = 0; i <= method.methodIndex(); ++i) {
            if (mobj->method(i).methodType() != QMetaMethod::Signal) continue;
            ++ index;
        }
        return index;
#else
        return method.methodIndex();
#endif
    }
public:
    SignalList() {}
    void add(const QMetaMethod & method) {
        m_data[method.enclosingMetaObject()].insert(signalIndex(method));
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(signalIndex(method));
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int signalId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(signalId);
    }
};

//
// Implementation Using Event Compression With Access to Private Qt Headers

struct EventHelper : private QEvent {
    static void clearPostedFlag(QEvent * ev) {
        (&static_cast<EventHelper*>(ev)->t)[1] &= ~0x8001; // Hack to clear QEvent::posted
    }
};

template <class Base> class CompressorApplication : public Base {
    SignalList m_compressedSignals;
public:
    CompressorApplication(int & argc, char ** argv) : Base(argc, argv) {}
    void addCompressedSignal(const QMetaMethod & method) { m_compressedSignals.add(method); }
    void removeCompressedSignal(const QMetaMethod & method) { m_compressedSignals.remove(method); }
protected:
    bool compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents) {
        if (event->type() != QEvent::MetaCall)
            return Base::compressEvent(event, receiver, postedEvents);

        QMetaCallEvent *mce = static_cast<QMetaCallEvent*>(event);
        if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false;
        for (QPostEventList::iterator it = postedEvents->begin(); it != postedEvents->end(); ++it) {
            QPostEvent &cur = *it;
            if (cur.receiver != receiver || cur.event == 0 || cur.event->type() != event->type())
                continue;
            QMetaCallEvent *cur_mce = static_cast<QMetaCallEvent*>(cur.event);
            if (cur_mce->sender() != mce->sender() || cur_mce->signalId() != mce->signalId() ||
                    cur_mce->id() != mce->id())
                continue;
            if (true) {
              /* Keep The Newest Call */              
              // We can't merely qSwap the existing posted event with the new one, since QEvent
              // keeps track of whether it has been posted. Deletion of a formerly posted event
              // takes the posted event list mutex and does a useless search of the posted event
              // list upon deletion. We thus clear the QEvent::posted flag before deletion.
              EventHelper::clearPostedFlag(cur.event);
              delete cur.event;
              cur.event = event;
            } else {
              /* Keep the Oldest Call */
              delete event;
            }
            return true;
        }
        return false;
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        connect(invoke, &QPushButton::clicked, this, &Widget::sendSignals);
        connect(&m_signaller, &Signaller::emptySignal, this, &Widget::emptySlot, Qt::QueuedConnection);
        connect(&m_signaller, &Signaller::dataSignal, this, &Widget::dataSlot, Qt::QueuedConnection);
#else
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
#endif
    }
};

int main(int argc, char *argv[])
{
    CompressorApplication<QApplication> a(argc, argv);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::emptySignal));
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::dataSignal));
#else
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("emptySignal()")));
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("dataSignal(int)")));
#endif
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"
于 2014-01-05T22:04:25.993 回答
6

我正在尝试将我的评论变成答案。我同意您的观点,即文档缺少此信息,或者至少对我来说并不清楚,显然对您来说也是如此。

有两种选择可以获取更多信息:

1) 试用

将 qDebug() 或 printf()/fprintf() 语句放入“慢”线程的槽中,看看它打印出什么。运行几次并得出结论。

2) 确保

您需要阅读元对象编译器的源代码。moc 从源文件中得到这个。这是一个更复杂的调查,但这可能会导致确定性。

据我所知,每个信号发射都会发布一个相应的事件。然后,该事件将排队等待线程类中的单独线程。在这里可以找到相关的两个源代码文件:

void QCoreApplication::postEvent(QObject *receiver, QEvent *event, int 优先级)

类 QPostEventList :公共 QVector

有两种权衡取舍的方法:

从 data mutator slot 排队一个繁忙的 slot 操作

主要优点是在繁忙操作期间不会丢失信号。但是,这可能本质上更慢,因为它可能会处理比需要更多的操作。

这个想法是为每个处理的事件重新设置数据,但真正繁忙的操作只排队执行一次。如果有更多事件,它不一定必须是第一个事件,但这是最简单的实现。

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
    connect(this, SIGNAL(queueBusyOperationSignal()), SLOT(busyOperation()));
    ...
}

void Foo::dataUpdateSlot(const QByteArray &data)
{
    m_data = data;

    if (busyOperationQueued);
        emit queueBusyOperationSignal();
        m_busyOperationQueued = true;
    }
}

void MyClass::busyOperationSlot()
{

    // Do the busy work with m_data here

    m_busyOperationQueued = false;    
}

连接/断开

这个想法是在开始处理时断开插槽与相应信号的连接。这将确保不会捕获新的信号发射,并在线程有空闲处理下一个事件时再次将插槽连接到信号。

尽管在连接和下一个处理之间,这将在线程中有一些空闲时间,但至少这将是实现它的一种简单方法。根据此处未真正提供的更多上下文,它实际上可能甚至可以忽略不计的性能差异。

主要缺点是这会在繁忙操作期间丢失信号。

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(busyOperationSlot(const QByteArray&)));
    ...
}

void MyClass::busyOperationSlot(const QByteArray &data)
{
    disconnect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), this, SLOT(dataUpdateSlot(const QByteArray&)));

    // Do the busy work with data here

    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
}

未来的想法

我在想是否有一个方便的 API - 例如一个类似 processEvents() 的方法,但带有一个只处理最后一个发布的事件的参数 - 用于实际告诉事件系统显式处理最后一个而不是规避问题本身。它看起来确实是这样一个 API,但是,它是私有的。

也许,有人会提交一个功能请求,以便在公共场合拥有类似的东西。

/*!
\internal
Returns \c true if \a event was compressed away (possibly deleted) and should not be added to the list.
*/
bool QCoreApplication::compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents)

相关的源代码可以在这里找到。

它似乎在QGuiApplication和中也有一个覆盖版本QApplication

至于完整性,还有这样一个方法:

void QCoreApplication::removePostedEvents(QObject * receiver, int eventType = 0) [静态]

删除使用 postEvent() 作为接收者发布的给定 eventType 的所有事件。

事件不会被调度,而是从队列中移除。您永远不需要调用此函数。如果你调用它,请注意杀死事件可能会导致接收器破坏一个或多个不变量。

如果receiver 为null,则删除所有对象的eventType 事件。如果 eventType 为 0,则为接收者删除所有事件。你不应该在 eventType 为 0 的情况下调用这个函数。如果你以这种方式调用它,请注意杀死事件可能会导致接收者破坏一个或多个不变量。

但是根据文档,这并不是您想要的。

于 2014-01-01T11:19:48.070 回答
4

这是另一种方法。它不需要更改发送者或接收者对象,但需要自定义CompressorProxy对象。这对于 Qt 4 和 Qt 5 都是可移植的,并且不需要访问 Qt 的内部。

压缩器对象必须是目标对象的子对象——带有插槽的对象。这样它就可以跟踪目标对象的线程。由于压缩器的信号附加到目标的槽,当它们在同一个线程中时,目标槽调用没有排队连接的开销。

神奇发生在emitCheck方法中:它递归地调用自己。

  1. 插槽调用结束于emitCheck.
  2. 进一步发布的事件通过调用发送sendPostedEvents
  3. 如果事件队列中有任何重复的插槽调用,它们将emitCheck再次出现。
  4. 一旦队列中的最后一个事件被拾取并且sendPostedEvents不再递归,则为给定的插槽重置一个标志,这样它的代理信号就不会被多次发出。这是所需的压缩行为。

对于任何给定的对 , 实例的排队槽调用集合,对于在传递的事件列表中被多次调用的槽CompressorProxyemitCheck将只返回一次。true

请注意,在释放模式下,每次递归调用的堆栈使用量在 32 位架构上约为 600 字节,在 64 位架构上是两倍。在 OS X 的调试模式下,使用 64 位构建,每次递归使用的堆栈约为 4kb。

截屏

#include <QApplication>
#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

class CompressorProxy : public QObject {
    Q_OBJECT
    bool emitCheck(bool & flag) {
        flag = true;
        QCoreApplication::sendPostedEvents(this, QEvent::MetaCall); // recurse
        bool result = flag;
        flag = false;
        return result;
    }

    bool m_slot;
    Q_SLOT void slot() {
        if (emitCheck(m_slot)) emit signal();
    }
    Q_SIGNAL void signal();

    bool m_slot_int;
    Q_SLOT void slot_int(int arg1) {
        if (emitCheck(m_slot_int)) emit signal_int(arg1);
    }
    Q_SIGNAL void signal_int(int);
public:
    // No default constructor, since the proxy must be a child of the
    // target object.
    explicit CompressorProxy(QObject * parent) : QObject(parent) {}
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        m_edit->appendPlainText(QString("Qt %1").arg(qVersion()));
        CompressorProxy * proxy = new CompressorProxy(this);
        connect(&m_signaller, SIGNAL(emptySignal()), proxy, SLOT(slot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), proxy, SLOT(slot_int(int)), Qt::QueuedConnection);
        connect(proxy, SIGNAL(signal()), this, SLOT(emptySlot()));
        connect(proxy, SIGNAL(signal_int(int)), this, SLOT(dataSlot(int)));
    }
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"
于 2014-01-08T20:49:48.093 回答
3

这是另一种可移植到 Qt 4 和 Qt 5 的方法,并且不需要访问 Qt 的内部(除了通过公共头文件可用的内容)。在 Qt 5 上,仅支持 Qt 4 样式的连接。压缩实体是(接收器对象,槽)对。这与完全访问时使用的 (sender, receiver, signal, slot) 元组不同QMetaCallEvent

它利用QObject::qt_metacall来从黑匣子中窥探通话的详细信息QMetaCallEventsendPostedEvents就像在我的其他无内部答案中一样,使用了递归。

值得注意的是,QObject::qt_metacall至少从 Qt 4.0 开始,它的 API 就保持不变。

截屏

#include <QApplication>
#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>
#include <QSet>
#include <QMetaMethod>

// Common Code

/*! Keeps a list of method indices for one or more meatobject classes. */
class MethodList {
    Q_DISABLE_COPY(MethodList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
public:
    MethodList() {}
    template <class T> void add(const char * slot) {
        add(T::staticMetaObject.method(T::staticMetaObject.indexOfSlot(slot)));
    }
    void add(const QMetaMethod & method) {
        Q_ASSERT(method.methodIndex() >= 0);
        m_data[method.enclosingMetaObject()].insert(method.methodIndex());
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(method.methodIndex());
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int methodId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(methodId);
    }
};
Q_GLOBAL_STATIC(MethodList, compressedSlots)

// Compressor

class Compressor : public QObject {
    enum { Idle, Armed, Valid } m_state;
    QMetaObject::Call m_call;
    int m_methodIndex;
    QSet<int> m_armed; // armed method IDs

    int qt_metacall(QMetaObject::Call call, int id, void ** args) {
        if (m_state != Armed) return QObject::qt_metacall(call, id, args);
        m_state = Valid;
        m_call = call;
        m_methodIndex = id;
        return 0;
    }
    bool eventFilter(QObject * target, QEvent * ev) {
        Q_ASSERT(target == parent());
        if (ev->type() == QEvent::MetaCall) {
            m_state = Armed;
            if (QT_VERSION < QT_VERSION_CHECK(5,0,0) || ! *(void**)(ev+1)) {
                // On Qt5, we ensure null QMetaCallEvent::slotObj_ since we can't handle Qt5-style member pointer calls
                Compressor::event(ev); // Use QObject::event() and qt_metacall to extract metacall data
            }
            if (m_state == Armed) m_state = Idle;
            // Only intercept compressed slot calls
            if (m_state != Valid || m_call != QMetaObject::InvokeMetaMethod ||
                    ! compressedSlots()->contains(target->metaObject(), m_methodIndex)) return false;
            int methodIndex = m_methodIndex;
            m_armed.insert(methodIndex);
            QCoreApplication::sendPostedEvents(target, QEvent::MetaCall); // recurse
            if (! m_armed.contains(methodIndex)) return true; // Compress the call
            m_armed.remove(methodIndex);
        }
        return false;
    }
public:
    Compressor(QObject * parent) : QObject(parent), m_state(Idle) {
        parent->installEventFilter(this);
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        m_edit->appendPlainText(QString("Qt %1").arg(qVersion()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
    }
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    compressedSlots()->add<Widget>("emptySlot()");
    compressedSlots()->add<Widget>("dataSlot(int)");
    Widget w;
    new Compressor(&w);
    w.show();
    return a.exec();
}

#include "main.moc"
于 2014-01-10T01:26:30.877 回答
2
thread_slow 

如果您使用队列连接或 postEvent,将处理在其事件循环中发送的所有信号

来源:

Queued Connection 当控制返回到接收者线程的事件循环时调用该槽。插槽在接收者的线程中执行。

QtDoc

如果您想了解有关如何处理事件的更多详细信息,可以查看此处:

https://qt.gitorious.org/qt/qtbase/source/631c3dbc800bb9b2e3b227c0a09523f0f7eef0b7:src/corelib/thread/qthread_p.h#L127

如您所见,事件按优先级顺序排序,因此如果您的所有事件具有相同的优先级,则它是先进先出的。

这不是一项微不足道的任务,这里是一个粗略的尝试,告诉我它是否有效。

我的建议是基本上自己存储事件并仅处理最后一个事件。

thread_slow.h

int current_val;
bool m_isRunning;

thread_slow.cpp

void enqueue_slot( int val /*or whatever you value is*/ ) {
     // You'll enventually need a a QMutex here if your slot is not call in the thread
     m_current_val = val;
     if( !m_isRunning )
         slowRun();
}

void checkHasReceivedEventSlot() {
    if( m_current_val != -1 ) // Invalid value or a test condition
        slowRun();
}

void slowRun() {
    m_isRunning = true;
    int v = m_current_val;
    m_current_val = -1; // Invalid value

   // Do stuff with v

   // Let the queue fill itself with enqueue_slot calls
   QTimer::singleShot(kTIMEOUT, this, SLOT(checkHasReceivedEventSlot()));
}

第一次调用enqueue_slot,会开始慢跑

编辑:

为确保它是最后一个事件,您可以执行以下操作:

void checkHasReceivedEventSlot() {
    // Runs enqueue_slot until no more events are in the loop
    while( m_thread->eventDispatcher()->hasPendingEvents() )
         m_thread->eventDispatcher()->processEvents(QEventLoop::AllEvents);

    // m_current_val should hold the last event
    if( m_current_val != -1 ) // Invalid value or a test condition
        slowRun();
}
于 2014-01-01T10:40:40.047 回答
2

来自问题:“如果它会处理所有信号,有没有办法让 thread_slow 只处理最后一个?”

如果您只想始终处理最后一个信号,并且不介意处理一些额外的信号,只要它不会使事情变慢,那么您可以尝试使用常规QThread::exec()事件循环的非常简单的方法。将这些插槽方法放入QObject子类中,然后将其移至线程:

//slot
void MyClass::publicReceiverSlotForQueuedSignals(QString data)
{
    // Update data every time
    mReceivedData = data;

    // Allow worker method to be queued just once
    if (!mWorkerSlotInvoked) {
        mWorkerSlotInvoked = true;
        QMetaObject::invokeMethod(this, "workerSlot", Qt::QueuedConnection);
        qDebug() << "publicReceiverSlotForQueuedSignals: invoked workerSlot!"
                 << "New data:" << mReceivedData;
    } else {
        qDebug() << "publicReceiverSlotForQueuedSignals: workerSlot already invoked."
                 << "New data:" << mReceivedData;
    }
}

//slot
void MyClass::privateWorkerSlot()
{
    mWorkerSlotInvoked = false;
    qDebug() << "workerSlot for data:" << mReceivedData;
    QThread::msleep(3000);
    qDebug() << "workerSlot returning.";
}

通过publicReceiverSlotForQueuedSignals非常快(qDebuginelse可能是快速呼叫最耗时的部分),因此排队的信号数量并不重要。然后privateWorkerSlot在该线程的每个事件循环轮换中只会调用一个,无论它运行得有多慢。

mReceivedData此外,mWorkerSlotInvoked在两个插槽方法(以及您可能使用它们的任何其他地方)中添加互斥锁来保护也是微不足道的。然后您可以直接连接到插槽,因为invokeMethod它是线程安全的,并且互斥锁也可以处理线程安全的私有数据成员MyClass。只需确保将 的内容复制mReceivedData到局部变量并解锁互斥锁,然后再进行耗时的处理。

注意:未经测试的代码,可能有一些错误。

于 2014-01-01T12:51:23.140 回答
1

您可以使用DirectConnectionQueueConnection的组合:

  1. 在您的工作人员方面(thread_slow):

    • 一个公共插槽,旨在由您的任务提供者调用 ( thread_fast)

      void Working::process()
      {
         if (working)
         {
           printf("Drop a task %p\n", QThread::currentThread()); 
           return;
         }
      
        working = true;        
        emit realWork();
      }
      
    • 处理功能(很慢):realProcess()

      void Working::realProcess()
      {
          printf("      **** Working ... %p\n",QThread::currentThread()); fflush(stdout);
      
          // Emulate a big processing ...
          usleep(3000*1000);
      
          printf("      **** Done. %p\n",QThread::currentThread());fflush(stdout);
          working = false;
          emit done();
      }
      
    • 一个QueueConnectionrealWorkrealProcess

      Working::Working()
      {
          working = false;
          connect(this,SIGNAL(realWork()),this,SLOT(realProcess()),Qt::QueuedConnection);
      }
      
  2. 在您的任务提供者方面 ( thread_fast)

    • 一个startWork()信号

      void TaskProv::orderWork()
      {
          emit startWork();
      }
      
    • 与工作进程槽的DirectConnection

      QObject::connect(&taskProvider,SIGNAL(startWork()),&worker,SLOT(process()),Qt::DirectConnection);
      

一些注意事项:

  • 该函数Working::process()将在thread_fast(即使它是一个工作成员函数)中运行,但它只是检查一个标志,因此它不应该影响处理时间

  • 如果您介意潜在的额外任务丢失,您可以使用互斥锁保护 Worker 的工作标志以进行更严格的管理。

  • 这与 lpapp 的“Queue a busy slot operation from the data mutator slot”非常相似,只是连接类型需要是 Direct 和 Queue 的正确组合。

于 2015-03-24T21:40:34.777 回答
0

作为@kuba-ober 答案的注释 - 我必须更新他们的处理程序以在调用之前compressEvent(...)检查它,否则我的代码会出现段错误。我不确定为什么会发生这种情况,但我也没有尝试压缩所有事件,我的系统中只有少数事件。mce->sender() != nullptrm_compressedSignals.contains(...)

更新的代码看起来像

// Added code:
if (mce->sender() == nullptr) {
  return Base::compressEvent(event, receiver, postedEvents);
}
// end added code
if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false;
于 2018-12-31T15:18:58.097 回答