Qt 是否有一QIODevice
对可用于进程内点对点通信?
可以使用具体的QTCPSocket
or QLocalSocket
,但是服务器端的连接 API 有点麻烦,并且通过操作系统强制数据似乎很浪费。
以下是一个可用的基本实现。它使用内部信号槽对将数据推送到另一个端点。这样,连接的任何一端都可以存在于任何线程中,并且可以在线程之间移动两端而不会丢失数据或引发任何竞争。
私人QRingBuffer
用于代替重新发明轮子。添加QT += core-private
到.pro
文件以使其可用。Qt PIMPL 用于访问 Qt 5.7 及更高版本中的内部设备缓冲区。
如果您希望实例化一个打开的管道,通常情况下,您可以将 I/O 模式传递给构造函数。典型用途:
int main(/*…*/)
{
/*…*/
AppPipe end1 { QIODevice::ReadWrite };
AppPipe end2 { &end1, QIODevice::ReadWrite };
AppPipe end3 { &end1, QIODevice::ReadOnly };
// the pipes are open ready to use
/*…*/
}
无论您向一个管道写入什么内容,最终都会成为另一个连接管道中的可读数据,反之亦然。在上面的示例中,写入的数据end1
可以从两者独立读取。写入的数据可以从. 实际上是一个只听管道。附加管道的连接很便宜 - 将大块数据发送到多个管道没有 O(N) 成本,因为已连接管道的 read 存储从原始管道发送的整个字节数组的浅拷贝。end2
end3
end2
end1
end3
QRingBuffer
所有QIODevice
语义都成立 - 您可以连接到信号,使用带有 a或 areadyRead
的管道等。与 any 一样,您只能使用其 中的类,但另一个端点可以存在于任何线程中,并且两者都可以根据需要在线程之间移动,而不会丢失数据。QDataStream
QTextStream
QIODevice
thread()
如果另一个管道端未打开且不可读,则写入是无操作的,即使它们成功。关闭管道会清除读取和写入缓冲区,以便它可以重新打开以供重用。
管道默认缓冲写入数据,写入缓冲区可以使用 强制刷新AppPipe::flush()
,除非在QIODevice::Unbuffered
mode 中打开。
和信号在监控通过管道的数据hasIncoming
时很有用。hasOutgoing
// https://github.com/KubaO/stackoverflown/tree/master/questions/local-pipe-32317081
// This project is compatible with Qt 4 and Qt 5
#include <QtTest>
#include <private/qiodevice_p.h>
#include <private/qringbuffer_p.h>
#include <algorithm>
#include <climits>
#ifndef Q_DECL_OVERRIDE
#define Q_DECL_OVERRIDE
#endif
class AppPipePrivate : public QIODevicePrivate {
public:
#if QT_VERSION < QT_VERSION_CHECK(5,7,0)
QRingBuffer buffer;
QRingBuffer writeBuffer;
int writeBufferChunkSize;
#endif
const QByteArray *writeData;
AppPipePrivate() : writeData(0) { writeBufferChunkSize = 4096; }
};
/// A simple point-to-point intra-process pipe. The other endpoint can live in any
/// thread.
class AppPipe : public QIODevice {
Q_OBJECT
Q_DECLARE_PRIVATE(AppPipe)
static inline int intLen(qint64 len) { return std::min(len, qint64(INT_MAX)); }
Q_SLOT void _a_write(const QByteArray &data) {
Q_D(AppPipe);
if (!(d->openMode & QIODevice::ReadOnly)) return; // We must be readable.
d->buffer.append(data); // This is a chunk shipped from the source.
emit hasIncoming(data);
emit readyRead();
}
void hasOutgoingLong(const char *data, qint64 len) {
while (len) {
int const size = intLen(len);
emit hasOutgoing(QByteArray(data, size));
data += size;
len -= size;
}
}
public:
AppPipe(QIODevice::OpenMode mode, QObject *parent = 0) :
QIODevice(*new AppPipePrivate, parent) {
open(mode);
}
AppPipe(AppPipe *other, QIODevice::OpenMode mode, QObject *parent = 0) :
QIODevice(*new AppPipePrivate, parent) {
open(mode);
addOther(other);
}
AppPipe(AppPipe *other, QObject *parent = 0) :
QIODevice(*new AppPipePrivate, parent) {
addOther(other);
}
~AppPipe() Q_DECL_OVERRIDE {}
void addOther(AppPipe *other) {
if (other) {
connect(this, SIGNAL(hasOutgoing(QByteArray)), other, SLOT(_a_write(QByteArray)), Qt::UniqueConnection);
connect(other, SIGNAL(hasOutgoing(QByteArray)), this, SLOT(_a_write(QByteArray)), Qt::UniqueConnection);
}
}
void removeOther(AppPipe *other) {
disconnect(this, SIGNAL(hasOutgoing(QByteArray)), other, SLOT(_a_write(QByteArray)));
disconnect(other, SIGNAL(hasOutgoing(QByteArray)), this, SLOT(_a_write(QByteArray)));
}
void flush() {
Q_D(AppPipe);
while (!d->writeBuffer.isEmpty()) {
QByteArray const data = d->writeBuffer.read();
emit hasOutgoing(data);
emit bytesWritten(data.size());
}
}
void close() Q_DECL_OVERRIDE {
Q_D(AppPipe);
flush();
QIODevice::close();
d->buffer.clear();
}
qint64 write(const QByteArray &data) { // This is an optional optimization. The base method works OK.
Q_D(AppPipe);
QScopedValueRollback<const QByteArray*> back(d->writeData);
if (!(d->openMode & Text))
d->writeData = &data;
return QIODevice::write(data);
}
qint64 writeData(const char *data, qint64 len) Q_DECL_OVERRIDE {
Q_D(AppPipe);
bool buffered = !(d->openMode & Unbuffered);
if (buffered && (d->writeBuffer.size() + len) > d->writeBufferChunkSize)
flush();
if (!buffered
|| len > d->writeBufferChunkSize
|| (len == d->writeBufferChunkSize && d->writeBuffer.isEmpty()))
{
if (d->writeData && d->writeData->data() == data && d->writeData->size() == len)
emit hasOutgoing(*d->writeData);
else
hasOutgoingLong(data, len);
}
else
memcpy(d->writeBuffer.reserve(len), data, len);
return len;
}
bool isSequential() const Q_DECL_OVERRIDE { return true; }
Q_SIGNAL void hasOutgoing(const QByteArray &);
Q_SIGNAL void hasIncoming(const QByteArray &);
#if QT_VERSION >= QT_VERSION_CHECK(5,7,0)
// all the data is in the read buffer already
qint64 readData(char *, qint64) Q_DECL_OVERRIDE { return 0; }
#else
qint64 readData(char *data, qint64 len) Q_DECL_OVERRIDE {
Q_D(AppPipe);
qint64 hadRead = 0;
while (len && !d->buffer.isEmpty()) {
int size = d->buffer.read(data, intLen(len));
hadRead += size;
data += size;
len -= size;
}
return hadRead;
}
bool canReadLine() const Q_DECL_OVERRIDE {
Q_D(const AppPipe);
return d->buffer.indexOf('\n') != -1 || QIODevice::canReadLine();
}
qint64 bytesAvailable() const Q_DECL_OVERRIDE {
Q_D(const AppPipe);
return QIODevice::bytesAvailable() + d->buffer.size();
}
qint64 bytesToWrite() const Q_DECL_OVERRIDE {
Q_D(const AppPipe);
return QIODevice::bytesToWrite() + d->writeBuffer.size();
}
#endif
};
// ...
#include "main.moc"
一个最小的测试工具:
class TestAppPipe : public QObject {
Q_OBJECT
QByteArray data1, data2;
struct PipePair {
AppPipe end1, end2;
PipePair(QIODevice::OpenMode mode = QIODevice::NotOpen) :
end1(QIODevice::ReadWrite | mode), end2(&end1, QIODevice::ReadWrite | mode) {}
};
Q_SLOT void initTestCase() {
data1 = randomData();
data2 = randomData();
}
Q_SLOT void sizes() {
QCOMPARE(sizeof(AppPipe), sizeof(QIODevice));
}
Q_SLOT void basic() {
PipePair p;
QVERIFY(p.end1.isOpen() && p.end1.isWritable() && p.end1.isReadable());
QVERIFY(p.end2.isOpen() && p.end2.isWritable() && p.end2.isReadable());
static const char hello[] = "Hello There!";
p.end1.write(hello);
p.end1.flush();
QCOMPARE(p.end2.readAll(), QByteArray(hello));
}
static QByteArray randomData(int const size = 1024*1024*32) {
QByteArray data;
data.resize(size);
char *const d = data.data();
for (char *p = d+data.size()-1; p >= d; --p)
*p = qrand();
Q_ASSERT(data.size() == size);
return data;
}
static void randomChunkWrite(AppPipe *dev, const QByteArray &payload) {
for (int written = 0, left = payload.size(); left; ) {
int const chunk = std::min(qrand() % 82931, left);
dev->write(payload.mid(written, chunk));
left -= chunk; written += chunk;
}
dev->flush();
}
void runBigData(PipePair &p) {
Q_ASSERT(!data1.isEmpty() && !data2.isEmpty());
randomChunkWrite(&p.end1, data1);
randomChunkWrite(&p.end2, data2);
QCOMPARE(p.end1.bytesAvailable(), qint64(data2.size()));
QCOMPARE(p.end2.bytesAvailable(), qint64(data1.size()));
QCOMPARE(p.end1.readAll(), data2);
QCOMPARE(p.end2.readAll(), data1);
}
Q_SLOT void bigDataBuffered() {
PipePair p;
runBigData(p);
}
Q_SLOT void bigDataUnbuffered() {
PipePair p(QIODevice::Unbuffered);
runBigData(p);
}
Q_SLOT void cleanupTestCase() {
data1.clear(); data2.clear();
}
};
QTEST_MAIN(TestAppPipe)
# local-pipe-32317081.pro
QT = core
greaterThan(QT_MAJOR_VERSION, 4): QT = core-private testlib
else: CONFIG += qtestlib
DEFINES += \
QT_DEPRECATED_WARNINGS \
QT_DISABLE_DEPRECATED_BEFORE=0x060000 \
QT_RESTRICTED_CAST_FROM_ASCII
CONFIG += console c++14
CONFIG -= app_bundle
TEMPLATE = app
SOURCES = main.cpp