4

Qt 是否有一QIODevice对可用于进程点对点通信?

可以使用具体的QTCPSocketor QLocalSocket,但是服务器端的连接 API 有点麻烦,并且通过操作系统强制数据似乎很浪费。

4

1 回答 1

11

以下是一个可用的基本实现。它使用内部信号槽对将数据推送到另一个端点。这样,连接的任何一端都可以存在于任何线程中,并且可以在线程之间移动两端而不会丢失数据或引发任何竞争。

私人QRingBuffer用于代替重新发明轮子。添加QT += core-private.pro文件以使其可用。Qt PIMPL 用于访问 Qt 5.7 及更高版本中的内部设备缓冲区。

如果您希望实例化一个打开的管道,通常情况下,您可以将 I/O 模式传递给构造函数。典型用途:

int main(/*…*/)
{
   /*…*/
   AppPipe end1 { QIODevice::ReadWrite };
   AppPipe end2 { &end1, QIODevice::ReadWrite };
   AppPipe end3 { &end1, QIODevice::ReadOnly };
   // the pipes are open ready to use
   /*…*/
}

无论您向一个管道写入什么内容,最终都会成为另一个连接管道中的可读数据,反之亦然。在上面的示例中,写入的数据end1可以从两者独立读取。写入的数据可以从. 实际上是一个只听管道。附加管道的连接很便宜 - 将大块数据发送到多个管道没有 O(N) 成本,因为已连接管道的 read 存储从原始管道发送的整个字节数组的浅拷贝。end2end3end2end1end3QRingBuffer

所有QIODevice语义都成立 - 您可以连接到信号,使用带有 a或 areadyRead的管道等。与 any 一样,您只能使用其 中的类,但另一个端点可以存在于任何线程中,并且两者都可以根据需要在线程之间移动,而不会丢失数据。QDataStreamQTextStreamQIODevicethread()

如果另一个管道端未打开且不可读,则写入是无操作的,即使它们成功。关闭管道会清除读取和写入缓冲区,以便它可以重新打开以供重用。

管道默认缓冲写入数据,写入缓冲区可以使用 强制刷新AppPipe::flush(),除非在QIODevice::Unbufferedmode 中打开。

和信号在监控通过管道的数据hasIncoming时很有用。hasOutgoing

// https://github.com/KubaO/stackoverflown/tree/master/questions/local-pipe-32317081
// This project is compatible with Qt 4 and Qt 5
#include <QtTest>
#include <private/qiodevice_p.h>
#include <private/qringbuffer_p.h>
#include <algorithm>
#include <climits>

#ifndef Q_DECL_OVERRIDE
#define Q_DECL_OVERRIDE
#endif

class AppPipePrivate : public QIODevicePrivate {
public:
#if QT_VERSION < QT_VERSION_CHECK(5,7,0)
   QRingBuffer buffer;
   QRingBuffer writeBuffer;
   int writeBufferChunkSize;
#endif
   const QByteArray *writeData;
   AppPipePrivate() : writeData(0) { writeBufferChunkSize = 4096; }
};

/// A simple point-to-point intra-process pipe. The other endpoint can live in any
/// thread.
class AppPipe : public QIODevice {
   Q_OBJECT
   Q_DECLARE_PRIVATE(AppPipe)
   static inline int intLen(qint64 len) { return std::min(len, qint64(INT_MAX)); }
   Q_SLOT void _a_write(const QByteArray &data) {
      Q_D(AppPipe);
      if (!(d->openMode & QIODevice::ReadOnly)) return; // We must be readable.
      d->buffer.append(data); // This is a chunk shipped from the source.
      emit hasIncoming(data);
      emit readyRead();
   }
   void hasOutgoingLong(const char *data, qint64 len) {
      while (len) {
         int const size = intLen(len);
         emit hasOutgoing(QByteArray(data, size));
         data += size;
         len -= size;
      }
   }
public:
   AppPipe(QIODevice::OpenMode mode, QObject *parent = 0) :
      QIODevice(*new AppPipePrivate, parent) {
      open(mode);
   }
   AppPipe(AppPipe *other, QIODevice::OpenMode mode, QObject *parent = 0) :
      QIODevice(*new AppPipePrivate, parent) {
      open(mode);
      addOther(other);
   }
   AppPipe(AppPipe *other, QObject *parent = 0) :
      QIODevice(*new AppPipePrivate, parent) {
      addOther(other);
   }
   ~AppPipe() Q_DECL_OVERRIDE {}
   void addOther(AppPipe *other) {
      if (other) {
         connect(this, SIGNAL(hasOutgoing(QByteArray)), other, SLOT(_a_write(QByteArray)), Qt::UniqueConnection);
         connect(other, SIGNAL(hasOutgoing(QByteArray)), this, SLOT(_a_write(QByteArray)), Qt::UniqueConnection);
      }
   }
   void removeOther(AppPipe *other) {
      disconnect(this, SIGNAL(hasOutgoing(QByteArray)), other, SLOT(_a_write(QByteArray)));
      disconnect(other, SIGNAL(hasOutgoing(QByteArray)), this, SLOT(_a_write(QByteArray)));
   }
   void flush() {
      Q_D(AppPipe);
      while (!d->writeBuffer.isEmpty()) {
         QByteArray const data = d->writeBuffer.read();
         emit hasOutgoing(data);
         emit bytesWritten(data.size());
      }
   }
   void close() Q_DECL_OVERRIDE {
      Q_D(AppPipe);
      flush();
      QIODevice::close();
      d->buffer.clear();
   }
   qint64 write(const QByteArray &data) { // This is an optional optimization. The base method works OK.
      Q_D(AppPipe);
      QScopedValueRollback<const QByteArray*> back(d->writeData);
      if (!(d->openMode & Text))
         d->writeData = &data;
      return QIODevice::write(data);
   }
   qint64 writeData(const char *data, qint64 len) Q_DECL_OVERRIDE {
      Q_D(AppPipe);
      bool buffered = !(d->openMode & Unbuffered);
      if (buffered && (d->writeBuffer.size() + len) > d->writeBufferChunkSize)
         flush();
      if (!buffered
          || len > d->writeBufferChunkSize
          || (len == d->writeBufferChunkSize && d->writeBuffer.isEmpty()))
      {
         if (d->writeData && d->writeData->data() == data && d->writeData->size() == len)
            emit hasOutgoing(*d->writeData);
         else
            hasOutgoingLong(data, len);
      }
      else
         memcpy(d->writeBuffer.reserve(len), data, len);
      return len;
   }
   bool isSequential() const Q_DECL_OVERRIDE { return true; }
   Q_SIGNAL void hasOutgoing(const QByteArray &);
   Q_SIGNAL void hasIncoming(const QByteArray &);
#if QT_VERSION >= QT_VERSION_CHECK(5,7,0)
   // all the data is in the read buffer already
   qint64 readData(char *, qint64) Q_DECL_OVERRIDE { return 0; }
#else
   qint64 readData(char *data, qint64 len) Q_DECL_OVERRIDE {
      Q_D(AppPipe);
      qint64 hadRead = 0;
      while (len && !d->buffer.isEmpty()) {
         int size = d->buffer.read(data, intLen(len));
         hadRead += size;
         data += size;
         len -= size;
      }
      return hadRead;
   }
   bool canReadLine() const Q_DECL_OVERRIDE {
      Q_D(const AppPipe);
      return d->buffer.indexOf('\n') != -1 || QIODevice::canReadLine();
   }
   qint64 bytesAvailable() const Q_DECL_OVERRIDE {
      Q_D(const AppPipe);
      return QIODevice::bytesAvailable() + d->buffer.size();
   }
   qint64 bytesToWrite() const Q_DECL_OVERRIDE {
      Q_D(const AppPipe);
      return QIODevice::bytesToWrite() + d->writeBuffer.size();
   }
#endif
};

// ...

#include "main.moc"

一个最小的测试工具:

class TestAppPipe : public QObject {
   Q_OBJECT
   QByteArray data1, data2;
   struct PipePair {
      AppPipe end1, end2;
      PipePair(QIODevice::OpenMode mode = QIODevice::NotOpen) :
         end1(QIODevice::ReadWrite | mode), end2(&end1, QIODevice::ReadWrite | mode) {}
   };
   Q_SLOT void initTestCase() {
      data1 = randomData();
      data2 = randomData();
   }
   Q_SLOT void sizes() {
      QCOMPARE(sizeof(AppPipe), sizeof(QIODevice));
   }
   Q_SLOT void basic() {
      PipePair p;
      QVERIFY(p.end1.isOpen() && p.end1.isWritable() && p.end1.isReadable());
      QVERIFY(p.end2.isOpen() && p.end2.isWritable() && p.end2.isReadable());
      static const char hello[] = "Hello There!";
      p.end1.write(hello);
      p.end1.flush();
      QCOMPARE(p.end2.readAll(), QByteArray(hello));
   }
   static QByteArray randomData(int const size = 1024*1024*32) {
      QByteArray data;
      data.resize(size);
      char *const d = data.data();
      for (char *p = d+data.size()-1; p >= d; --p)
         *p = qrand();
      Q_ASSERT(data.size() == size);
      return data;
   }
   static void randomChunkWrite(AppPipe *dev, const QByteArray &payload) {
      for (int written = 0, left = payload.size(); left; ) {
         int const chunk = std::min(qrand() % 82931, left);
         dev->write(payload.mid(written, chunk));
         left -= chunk; written += chunk;
      }
      dev->flush();
   }
   void runBigData(PipePair &p) {
      Q_ASSERT(!data1.isEmpty() && !data2.isEmpty());
      randomChunkWrite(&p.end1, data1);
      randomChunkWrite(&p.end2, data2);
      QCOMPARE(p.end1.bytesAvailable(), qint64(data2.size()));
      QCOMPARE(p.end2.bytesAvailable(), qint64(data1.size()));
      QCOMPARE(p.end1.readAll(), data2);
      QCOMPARE(p.end2.readAll(), data1);
   }
   Q_SLOT void bigDataBuffered() {
      PipePair p;
      runBigData(p);
   }
   Q_SLOT void bigDataUnbuffered() {
      PipePair p(QIODevice::Unbuffered);
      runBigData(p);
   }
   Q_SLOT void cleanupTestCase() {
      data1.clear(); data2.clear();
   }
};

QTEST_MAIN(TestAppPipe)
# local-pipe-32317081.pro
QT = core
greaterThan(QT_MAJOR_VERSION, 4): QT = core-private testlib
else: CONFIG += qtestlib
DEFINES += \
  QT_DEPRECATED_WARNINGS \
  QT_DISABLE_DEPRECATED_BEFORE=0x060000 \
  QT_RESTRICTED_CAST_FROM_ASCII
CONFIG += console c++14
CONFIG -= app_bundle
TEMPLATE = app
SOURCES = main.cpp
于 2015-08-31T18:08:12.413 回答