2

首先,我已经阅读了QThread并使用了QEventLoop但我并不完全确定我的实现是否正确。

TL;DR 请参阅下面的问题详细信息

最有用的信息来源是Qt WikiKDAB Qthread演示文稿(对 w/&w/o 事件循环有用)、此处此处与此问题相关的 SO 帖子。

我的情况是:

我有一个可能运行时间很长的函数,其中包含多个 I/O 磁盘调用。因此我需要一个线程来不阻塞 UI。为此,我自己实现了一个线程。

TL;DR QThreads

我的理解是QThread是一个单独的事件循环对象,并且需要自定义run()实现或将对象移动到新创建的线程对象,其中移动的对象存在(并运行)。我所描述的是带有事件循环实现的。

问题

我可能会遗漏一些东西,因为我上面描述的这个实现不能正常工作。我怎么知道这一点,上面的 Qt Docs 和 SO 帖子提到QThread::quit()QThread::exit()依赖于QEventLoop,如果QThread::exec()没有运行(通过调用QThread::run()通过QThread::start() ),那么quit()exit()函数将永远不会运行,这是我的问题

我的实现理念类似于 Java 的 Thread & Lambda 语法,例如

new Thread(() -> { // some code to run in a different thread}).start();

我使用了以下实现

可以使用 lambda 的各种线程对象容器

QWorkerThread: public QObject

    // This is the thread that runs object below
----QWaitThread : public QThread

    // This is the object which lives inside the above thread
----ThreadWorker : public QObject, public QInterruptable

简单的示例用法是(在内部完成线程和子对象清理QWorkerThread):

QWorkerThread *workerThread = new QWorkerThread;
workerThread->setRunnable([](){
    // insert CPU intensive or long running task here
});
workerThread->start();

问题详情/示例

// somewhere in main UI thread
workerThread->stop(); // or workerThread->kill() 

调用QThread::quit()or , QThread::quit()then不会终止线程。在 lambda (inside ) 中定义的长时间运行的进程将一直运行到完成。QThread::terminate()QThread::wait()setRunnable()

我知道这篇文章比传统的要长,但我希望每个人都能全面了解我正在努力实现的目标,因为我不确定我的问题到底出在哪里。

任何帮助将不胜感激!


代码实现

我将发布所有代码以全面了解实现,以防我错过重要的事情。

QWaitThread.h是一个实现QThread

#ifndef QWAITTHREAD_H
#define QWAITTHREAD_H

#include <QObject>
#include <QThread>
#include <QWaitCondition>
#include <QMutex>

class QWaitThread : public QThread
{
    Q_OBJECT
public:
    explicit QWaitThread(QObject *parent = nullptr);
    ~QWaitThread();
    virtual void pause();
    virtual void resume();    

signals:
    void paused();
    void resumed();

public slots:
    void pausing();
    void resuming();

private:
    QWaitCondition *waitCondition;
    QMutex mutex;
};

#endif // QWAITTHREAD_H

QWaitThread.cpp

#include "qwaitthread.h"

QWaitThread::QWaitThread(QObject *parent) : QThread(parent)
{
    waitCondition = new QWaitCondition;
}

QWaitThread::~QWaitThread()
{
    if(waitCondition != nullptr) {
        delete waitCondition;
    }
}

void QWaitThread::pause()
{
    emit paused();
    waitCondition->wait(&mutex);
}

void QWaitThread::resume()
{
    waitCondition->wakeAll();
    emit resumed();
}

void QWaitThread::pausing()
{
    pause();
}

void QWaitThread::resuming()
{
    resume();
}

QInterruptable.h接口定义了一些预期的功能

#ifndef QINTERRUPTABLE_H
#define QINTERRUPTABLE_H

class QInterruptable {
public:
    virtual void pause() = 0;
    virtual void resume() = 0;
    virtual void interrupt() = 0;
    virtual ~QInterruptable() = default;
};

#endif // QINTERRUPTABLE_H

ThreadWorker.h是存在(并运行)内部的对象QWaitThread

#ifndef THREADWORKER_H
#define THREADWORKER_H

#include <QObject>
#include <functional>
#include <QWaitCondition>
#include <QMutex>

#include "QInterruptable.h"

class ThreadWorker : public QObject, public QInterruptable
{
    Q_OBJECT

private:
    QMutex mutex;
    QWaitCondition *waitCondition;
    std::function<void ()> runnable;
    bool shouldPause = false;

public:
    explicit ThreadWorker(QObject *parent = nullptr);
    ThreadWorker(std::function<void ()> func);
    ~ThreadWorker();

    void setRunnable(const std::function<void ()> &value);

signals:
    /**
     * Emitted when the QWorkerThread object has started work
     * @brief started
     */
    void started();

    /**
     * @brief progress reports on progress in method, user defined.
     * @param value reported using int
     */
    void progress(int value);

    /**
     * Emitted when the QWorkerThread object has finished its work, same signal is used from &QThread::finished
     * @brief started
     */
    void finished();

    /**
     * Emitted when the QWorkerThread has encountered an error, user defined.
     * @brief started
     */
    void error();

public slots:
    virtual void run();
    virtual void cleanup();


    // QInterruptable interface
public:
    void pause()
    {
        shouldPause = true;
    }
    void resume()
    {
        shouldPause = false;
    }
    QMutex& getMutex();
    QWaitCondition *getWaitCondition() const;
    void setWaitCondition(QWaitCondition *value);
    bool getShouldPause() const;

    // QInterruptable interface
public:
    void interrupt()
    {

    }
};

#endif // THREADWORKER_H

线程工作者.cpp

#include "threadworker.h"

void ThreadWorker::setRunnable(const std::function<void ()> &value)
{
    runnable = value;
}

QMutex& ThreadWorker::getMutex()
{
    return mutex;
}

QWaitCondition *ThreadWorker::getWaitCondition() const
{
    return waitCondition;
}

void ThreadWorker::setWaitCondition(QWaitCondition *value)
{
    waitCondition = value;
}

bool ThreadWorker::getShouldPause() const
{
    return shouldPause;
}

ThreadWorker::ThreadWorker(QObject *parent) : QObject(parent)
{
    waitCondition = new QWaitCondition;
}

ThreadWorker::ThreadWorker(std::function<void ()> func): runnable(func) {
    waitCondition = new QWaitCondition;
}


ThreadWorker::~ThreadWorker()
{    
    if(waitCondition != nullptr){
        delete waitCondition;
    }
}

void ThreadWorker::run()
{
    emit started();
    runnable();
    emit finished();
}

void ThreadWorker::cleanup()
{

}

QWorkerThread.h感兴趣的主类,接受可运行的lambda,主“线程”处理发生在哪里,移动到线程,启动线程,处理事件等

#ifndef QWORKERTHREAD_H
#define QWORKERTHREAD_H

#include <QObject>
#include <functional>
#include <QThread>
#include <QEventLoop>

#include "qwaitthread.h"
#include "threadworker.h"

class QWorkerThread: public QObject
{
    Q_OBJECT

public:

    enum State {
        Running,
        Paused,
        NotRunning,
        Finished,
        Waiting,
        Exiting
    };

    QWorkerThread();
    explicit QWorkerThread(std::function<void ()> func);
    ~QWorkerThread();
    static QString parseState(QWorkerThread::State state);
    virtual void setRunnable(std::function <void()> runnable);
    virtual void start(QThread::Priority priority = QThread::Priority::InheritPriority);
    virtual void stop();
    virtual void wait(unsigned long time = ULONG_MAX);
    virtual void kill();
    virtual void setWorkerObject(ThreadWorker *value);

    virtual void pause();
    virtual void resume();
    virtual QWaitThread *getWorkerThread() const;

    State getState() const;


signals:
    /**
     * Emitted when the QWorkerThread object has started work
     * @brief started
     */
    void started();

    /**
     * @brief progress reports on progress in method, user defined.
     * @param value reported using int
     */
    void progress(int value);

    /**
     * Emitted when the QWorkerThread object has finished its work, same signal is used from &QThread::finished
     * @brief started
     */
    void finished();

    /**
     * Emitted when the QWorkerThread has encountered an error, user defined.
     * @brief started
     */
    void error();

private:
    /**
     * @brief workerObject - Contains the object and 'method' that will be moved to `workerThread`
     */
    ThreadWorker *workerObject = nullptr;

    /**
     * @brief workerThread - Worker Thread is seperate thread that runs the method
     */
    QWaitThread *workerThread = nullptr;

    State state = State::NotRunning;

};

#endif // QWORKERTHREAD_H

QWorkerThread.cpp实现

#include "qworkerthread.h"

QWorkerThread::QWorkerThread()
{
    state = State::NotRunning;
    workerThread = new QWaitThread;
    workerObject = new ThreadWorker;
    workerThread->setObjectName("WorkerThread");
}

QWorkerThread::QWorkerThread(std::function<void ()> func)
{
    state = State::NotRunning;
    workerThread = new QWaitThread;
    workerObject = new ThreadWorker(func);
    workerThread->setObjectName("WorkerThread");
}

QWorkerThread::~QWorkerThread()
{
    //  Check if worker thread is running
    if(workerThread->isRunning()) {

        // Exit thread with -1
        workerThread->exit(-1);
    }

    if(!workerThread->isFinished()) {
        workerThread->wait(500);

        if(workerThread->isRunning()) {
            workerThread->terminate();
        }
    }

    // cleanup
    delete workerObject;
    delete workerThread;
}

void QWorkerThread::setRunnable(std::function<void ()> runnable)
{
    workerObject->setRunnable(runnable);
}

void QWorkerThread::start(QThread::Priority priority)
{

    state = State::Running;
    // Connect workerThread start signal to ThreadWorker object's run slot
    connect(workerThread, &QThread::started, workerObject, &ThreadWorker::started);
    connect(workerThread, &QThread::started, workerObject, &ThreadWorker::run);

    // Connect threadWorker progress report to this progress report
    connect(workerObject, &ThreadWorker::progress, this, &QWorkerThread::progress);

    // Cleanup
    connect(workerObject, &ThreadWorker::finished, this, [this](){
        state = State::Finished;
        emit finished();
    });
    connect(workerThread, &QWaitThread::finished, this, [this] {
        workerObject->deleteLater();
    });

    // move workerObject to thread
    workerObject->moveToThread(workerThread);

    // emit signal that we are starting
    emit started();

    // Start WorkerThread which invokes object to start process method
    workerThread->start(priority);
}

void QWorkerThread::stop()
{
    state = State::Exiting;
    // Exit thread safely with success
    workerThread->quit();

    emit finished();
}

void QWorkerThread::wait(unsigned long time)
{
    state = State::Waiting;
    workerThread->wait(time);
}

void QWorkerThread::kill()
{
    // try stopping
    stop();

    // check if still running
    if(workerThread->isRunning()){
        // forcefully kill
        workerThread->terminate();
        workerThread->wait();
    }

    emit finished();
}

void QWorkerThread::setWorkerObject(ThreadWorker *value)
{
    workerObject = value;
}

QWaitThread *QWorkerThread::getWorkerThread() const
{
    return workerThread;
}

QWorkerThread::State QWorkerThread::getState() const
{
    return state;
}

QString QWorkerThread::parseState(QWorkerThread::State state) {
    switch (state) {
        case Running:
            return "Running";
        case Paused:
            return "Paused";
        case NotRunning:
            return "NotRunning";
        case Finished:
            return "Finished";
        case Waiting:
            return "Waiting";
        case Exiting:
            return "Exiting";
    }

    return QString("Unknown State [%1]").arg(QString::number(state)) ;
}

void QWorkerThread::pause()
{
    workerObject->pause();
    state = State::Paused;
}

void QWorkerThread::resume()
{
    workerObject->resume();
    state = State::Running;
}

更新一些额外的信息

关于~QWorkerThread(),我注意到在调用delete QThreador时QThread::deleteLater()QWaitThread()(or QThread) 会抛出一个致命错误:线程在运行时被销毁。这是在quit()/terminate()被调用之后。

QThread.cpp中的以下行

if (d->running && !d->finished && !d->data->isAdopted)
    qFatal("QThread: Destroyed while thread is still running");

在哪里

d->running == true
d->finished == false
d->data->isAdopted ?
4

1 回答 1

1

我已经测试了你的代码,这就是我意识到的。
正如你所提到的,terminate()并没有完全停止一个线程。Qt 文档说:

终止线程的执行。线程可能会或可能不会立即终止,具体取决于操作系统的调度策略。确定后使用。QThread::wait()terminate()

不幸的是,wait()即使在terminate(). 您的代码可能有问题,但我创建了一个最大程度简化的示例来验证这一点,它仍然存在相同的问题。

首先,这是我建议更改的代码部分:

QWorkerThread::~QWorkerThread()
{
    ...
    // cleanup
    delete workerObject; // Unsafe, but the only way to call the destructor, if necessary
    delete workerThread; // qFatal
}

这是 Qt doc 关于析构函数不安全的说法:

QObject从拥有对象的线程(或以其他方式访问对象)以外的线程调用 delete是不安全的,除非您保证该对象此时没有处理事件。改为使用QObject::deleteLater(),并且DeferredDelete将发布一个事件,对象线程的事件循环最终将拾取该事件。默认情况下,拥有 aQObject的线程是创建 的线程QObject,但不是在QObject::moveToThread()被调用之后。

笔记。在没有. delete workerThread_workerThread->deleteLater()qFatal


好的,我们实际上有什么问题:

  1. 由于以下原因,QThread不能直接调用子类的析构函数terminate()qFatal
  2. wait()terminate()尽管有文档,但冻结并且无法使用

(似乎问题仅在将无限操作移入事件循环时才是实际的)

确保问题不在代码的其他位置 最小可重现示例

工人.h

#pragma once

#include <QObject>

class Worker : public QObject
{
    Q_OBJECT

public:
    ~Worker();

public slots:
    void process();
};

工人.cpp

#include "Worker.h"
#include <QThread>
#include <QDebug>
#include <QDateTime>

Worker::~Worker()
{
    qDebug() << "~Worker()";
}

void Worker::process()
{
    qDebug("Hello World!");

    while(true)
    {
        qDebug() << QDateTime::currentDateTime();
        QThread::msleep(100);
    }
}

主Win.h

#pragma once

#include <QtWidgets/QMainWindow>

class QThread;
class Worker;

class MainWin : public QMainWindow
{
    Q_OBJECT

public:
    MainWin(QWidget *parent = nullptr);
    ~MainWin();

private:
    QThread*    thread = nullptr;
    Worker*     worker = nullptr;
};

主Win.cpp

#include "MainWin.h"
#include "Worker.h"
#include <QThread>
#include <QDebug>
#include <QDateTime>

MainWin::MainWin(QWidget *parent)
  : QMainWindow(parent)
{
    thread = new QThread;
    worker = new Worker;

    worker->moveToThread(thread);

    // Start only one infinite operation
    connect(thread, &QThread::started, worker, &Worker::process);

    thread->start();
}

MainWin::~MainWin()
{
    if (thread->isRunning())
    {
        thread->exit(-1);
        thread->wait(500);
    }

    if (thread->isRunning())
    {
        thread->terminate();
    }

    //cleanup
    delete worker;
    delete thread; // qFatal("QThread: Destroyed while thread is still running")
}

我发现的唯一工作代码

MainWin::~MainWin()
{
    ...
    //cleanup
    delete worker; // Worker destructor will be called, but be note this is unsafe
    thread->deleteLater(); // Allows to avoid qFatal but make thread terminated
}

结论和建议

除了完全避免之外,我所能提供terminate()的一切就是在terminate()没有wait()and then的情况下使用workerThread->deleteLater()

如果您尝试终止的耗时操作是您自己的代码,请考虑在代码中嵌入一些终止标志。

在可能的情况下,最好避免使用原始指针并用智能指针替换它们。


我还能提供什么作为在线程中运行 lambda 的通用方式

简化示例如何使用 lamdas、信号槽、线程、开始完成信号QtConcurrent::run()QFuture<>. 通过这种方式,您既可以在一个持久的附加线程中运行代码,也可以在自动线程池中实现代码运行。但不支持终止。

LambdaThread.h

#pragma once

#include <QObject>
#include <functional>
#include <QFuture>

class QThreadPool;

class LambdaThread : public QObject
{
    Q_OBJECT

public:
    // maxThreadCount = -1 to use idealThreadCount by default
    LambdaThread(QObject *parent, int maxThreadCount = -1);

signals:
    void started();
    void finished();

public slots:
    // Invoke this directly or by a signal
    QFuture<void> setRunnable(std::function<void()> func);

private:
    /*
    For the case you need persistent thread sometimes.
    In the case you never need persistent thread,
    just remove m_threadPool from this class at all
    */
    QThreadPool* m_threadPool = nullptr;
};

LambdaThread.cpp

#include "LambdaThread.h"
#include <QtConcurrent/QtConcurrent>
#include <QThreadPool>

LambdaThread::LambdaThread(QObject *parent, int maxThreadCount /*= -1*/)
    : QObject(parent)
{
    m_threadPool = new QThreadPool(this);

    if(maxThreadCount > 0)
    {
        m_threadPool->setMaxThreadCount(maxThreadCount);

        if (maxThreadCount == 1)
        {
            // Avoid thread affinity changing
            m_threadPool->setExpiryTimeout(-1);
        }
    }
}

QFuture<void> LambdaThread::setRunnable(std::function<void()> func)
{
    return QtConcurrent::run(m_threadPool,
        [this, func]()
    {
        // Be note that you actually need event loop in a receiver thread only
        emit started();

        func();

        emit finished();
    });
}

只是 GUI 类示例,您可以在其中启动可运行文件并接收信号。

主Win.h

#pragma once

#include <QtWidgets/QMainWindow>
#include <functional>

class LambdaThread;

class MainWin : public QMainWindow
{
    Q_OBJECT

public:
    MainWin(QWidget *parent = nullptr);

signals:
    // For the case you want to use signals
    void newRunnable(std::function<void()> func);

private:
    LambdaThread* m_lambdaThread = nullptr;
};

主Win.cpp

#include "MainWin.h"
#include "LambdaThread.h"
#include <QFuture>
#include <QDebug>

MainWin::MainWin(QWidget *parent)
    : QMainWindow(parent)
{
    m_lambdaThread = new LambdaThread(this);

    connect(this, &MainWin::newRunnable,
        m_lambdaThread, &LambdaThread::setRunnable);

    /*
    Do not forget the third (`this`) context variable
    while using modern signal-slot connection syntax with lambdas
    */
    connect(m_lambdaThread, &LambdaThread::started,
        this, []()
    {
        qDebug() << "Runnable stated";
    });

    connect(m_lambdaThread, &LambdaThread::finished,
        this, []()
    {
        qDebug() << "Runnable finished";
    });

    // Set your lambda directly
    QFuture<void> future = m_lambdaThread->setRunnable([]()
    {
        qDebug() << "hello from threaded runnable";
    });

    // You can also use future (not necessary of course)
    //future.waitForFinished();

    // Or you can emit your lambda via the signal:
    emit newRunnable([]()
    {
        qDebug() << "hello from threaded runnable which comes from signal";
    });
}
于 2020-03-31T11:51:53.783 回答