1

我创建了这个服务器类,它在新连接进入时启动一个线程。在某些情况下它可以正常工作,但它不是很稳定。我正在尝试解决它在哪里中断。我的调试器告诉我一些关于 qmutex 的信息。如果有人能发现问题。泰

它通过信号和插槽与父级连接并获取数据。

这是标题:

#ifndef FORTUNESERVER_H
#define FORTUNESERVER_H

#include <QStringList>
#include <QTcpServer>
#include <QThread>
#include <QTcpSocket>
#include <string>
using namespace  std;


class FortuneServer : public QTcpServer
{
    Q_OBJECT

 public:
    FortuneServer(QObject *parent = 0);

public slots:


void procesServerString(string serverString);
void getStringToThread(string serverString);

protected:
void incomingConnection(int socketDescriptor);

private:
QStringList fortunes;

signals:

void procesServerStringToParent(string serverString);
void getStringToThreadSignal(string serverString);
};


class FortuneThread : public QObject
 {
Q_OBJECT

public:
FortuneThread(int socketDescriptor, QObject *parent);

public slots:

void getString();
void sendString(string sendoutString);

signals:

void error(QTcpSocket::SocketError socketError);
void fromThreadString(string serverString);
void finished();


private:
int socketDescriptor;
QString text;
QTcpSocket tcpSocket;
};

#endif

和抄送:

#include <stdlib.h>
#include <QtNetwork>
#include "MeshServer.hh"
#include <iostream>
#include "TableView.hh"

using namespace  std;

FortuneServer::FortuneServer(QObject *parent)
: QTcpServer(parent)
{

}

void FortuneServer::procesServerString(string serverString){

emit procesServerStringToParent(serverString);

}
void FortuneServer::getStringToThread(string serverString){

emit getStringToThreadSignal(serverString);

}

void FortuneServer::incomingConnection(int socketDescriptor)
{


FortuneThread *serverthread = new FortuneThread(socketDescriptor, this);
//connect(&serverthread, SIGNAL(finished()), &serverthread, SLOT(deleteLater()));


QThread* thread = new QThread;

serverthread->moveToThread(thread);

connect(thread, SIGNAL(started()), serverthread, SLOT(getString()));
connect(serverthread, SIGNAL(fromThreadString(string)), this,        SLOT(procesServerString(string)));
connect(this, SIGNAL(getStringToThreadSignal(string)), serverthread, SLOT(sendString(string)));

connect(serverthread, SIGNAL(finished()), thread, SLOT(quit()));
connect(serverthread, SIGNAL(finished()), serverthread, SLOT(deleteLater()));
connect(serverthread, SIGNAL(finished()), thread, SLOT(deleteLater()));

thread->start();

}



FortuneThread::FortuneThread(int socketDescriptor, QObject *parent)
: QObject(parent), socketDescriptor(socketDescriptor)
{



}

void FortuneThread::getString()
{

if (!tcpSocket.setSocketDescriptor(socketDescriptor)) {
    emit error(tcpSocket.error());
    cout<<"socket error"<<endl;
    return;
}
//in part
if(!tcpSocket.waitForReadyRead(10000)){

    emit finished();
    return;
}
int joj = tcpSocket.bytesAvailable();
char inbuffer[1024];
tcpSocket.read(inbuffer,1024);
string instring;
instring = inbuffer;
instring.resize(joj);

emit fromThreadString(instring);

}   


void FortuneThread::sendString(string sendoutString)
{       


//out part
char buffer[1024];
int buffer_len = 1024;
int bytecount;

memset(buffer, '\0', buffer_len);


string outstring = sendoutString;



int TempNumOne= (int)outstring.size();

for (int a=0;a<TempNumOne;a++)
    {
        buffer[a]=outstring[a];
    }

QByteArray block;
block = buffer;



tcpSocket.write(block);
tcpSocket.disconnectFromHost();
tcpSocket.waitForDisconnected();
emit finished();
}

这是来自父母:

//server start

QHostAddress adr;
adr.setAddress( QString("127.0.0.1") );
adr.toIPv4Address();
quint16 port = 1101;

if (!server.listen( adr, port)) {
  QMessageBox::critical(this, tr("CR_bypasser"),
      tr("Unable to start the server: %1.")
      .arg(server.errorString()));
  close();
  return;
}

QString ipAddress;
ipAddress = server.serverAddress().toString();
statusLabel->setText(tr("The server is running on\n\nIP: %1\nport: %2\n\n"
  "Run the Fortune Client example now.")
  .arg(ipAddress).arg(server.serverPort()));

connect (&server, SIGNAL(procesServerStringToParent(string)), this,  SLOT(procesServerString(string))); 
connect (this, SIGNAL(StringToServer(string)), &server, SLOT(getStringToThread(string))); 

编辑:我正在尝试做的事情:

我有一个客户端(游戏引擎(Cryengine)的一部分),我用它发送了一串游戏坐标和其他一些东西,就像它在我之前给出的链接中所做的那样。这工作正常。我在“127.0.0.1”端口 1101 上获取数据。现在我只需要在我自己的程序中评估这些数据,该程序有这个 TableView 类,我可以在其中收集从字符串中获取的坐标,从坐标中调用一些数据和然后通过服务器将这个新字符串返回给游戏引擎。在游戏中,我将单击对象获取它们的坐标,从中创建一个字符串(包含坐标、实体 ID 等),将此字符串发送到服务器,从 TableView 返回调用信息。我只需要这种单向流只有一个正在发送字符串的客户端。我不确定 recv(hsock, buffer, buffer_len, 0), 我猜在游戏中负责发送字符串的节点会等待返回字符串?这是我的第一个程序之一,我真的很困惑......

4

2 回答 2

6

您提供的代码是货物崇拜编码的典范:您做了各种不必要的事情,显然是希望解决问题。

可能的破坏者...

代码有很多问题,但我认为崩溃的原因是:tcpSocket.write(block)没有发送一个以零结尾的字符串。该块是零终止的,但对字节数组的分配不会将此零终止添加到size()QByteArray 中。以下代码打印 1,即使字节数组的内容内部有一个零终止字节。

QByteArray arr = "a";
qDebug("len=%d", arr.size());

接收代码期望零终止,但从不接收它。然后,您继续将非零终止缓冲区分配给std::string

string instring;
instring = inbuffer;
instring.resize(joj);

std::string & std::string::operator=(const char*)随后的调整大小是货物崇拜:您很可能在已经读取缓冲区之后尝试解决问题。

不要认为这意味着修复它是正确的方法。一点也不。正确的做法是删除您编写的代码并正确执行,而不需要大量无用的不必要的咒语。

...以及所有其他问题

你陷入了相信魔法的陷阱,在各种论坛上无休止地存在。

线程不是神奇的对象,您可以将其应用于任何问题,希望它们有所帮助。我不知道是什么让人们认为线程很神奇,但经验法则是:如果有人告诉你“哦,你应该试试线程”,他们很可能是错的。如果他们说与网络有关,他们几乎永远不会对,他们没有帮助,而且他们根本不了解您的问题(您似乎也不了解)。通常,除非您清楚地了解您的问题,否则线程将无济于事。Qt 的网络系统是异步的:如果您不使用这些waitxxxx()函数,它不会阻止代码的执行。顺便说一句,你不应该使用它们,所以这里一切都很好。不需要无数线程。

因此,完全没有必要为每个传入连接启动一个新线程。它会降低服务器的性能——特别是如果服务器执行简单处理,因为您将上下文切换和线程创建/拆除的开销添加到每个连接。您希望系统中每个内核的线程数少于 2,因此使用QThread::idealThreadCount()池中的线程数将是一个很好的起点。

您还剥夺了线程的好处,因为您使用网络线程仅接收数据,然后发出fromThreadString(string)信号。我认为该信号已发送到您的应用程序的主线程。现在这很愚蠢,因为从网络套接字接收一堆字节是微不足道的。您的线程不做任何工作,他们所做的所有工作都浪费在创建和删除上。

下面的代码是一个简单的示例,说明如何正确使用 Qt API 来实现客户端-服务器系统,该系统以循环方式跨物理内核分配工作。它应该表现得很好。Qt 中包含的 Fortune 客户端示例确实非常不幸,因为它恰恰是错误的处理方式。

会注意到的是:

  1. 这并不完全是微不足道的。Qt 可能更有帮助,但不是。

  2. 客户端和发送者都从线程池移到线程中。

  3. 断开连接的客户端不会被删除,而只是返回到线程池保存的客户端列表中。当调用客户端时,它们会被重用。

  4. QThread 不是派生自。QTcpServer 仅派生用于访问套接字句柄。

  5. 不使用名称以开头的函数wait()。一切都是异步处理的。

  6. newConnection(int)ThreadPool 为Client 的 slot保留一个查找的 QMetaMethod 。这比使用更快,QMetaObject::invokeMethod()因为它每次都必须查找内容。

  7. 在主线程中运行的计时器通过删除第一个发送者来启动信号槽链。每个发件人的删除都会触发下一个发件人的删除。最终,最后一个发送者在线程池中设置了quit()槽。后者finished()在所有线程确实完成时发出信号。

#include <QtCore/QCoreApplication>
#include <QtNetwork/QTcpServer>
#include <QtNetwork/QTcpSocket>
#include <QtCore/QQueue>
#include <QtCore/QThread>
#include <QtCore/QTimer>
#include <QtCore/QMetaMethod>

// Processes data on a socket connection
class Client : public QObject
{
    Q_OBJECT
public:
    Client(QObject* parent = 0) : QObject(parent), socket(new QTcpSocket(this))
    {
        connect(socket, SIGNAL(readyRead()), SLOT(newData()));
        connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
                SLOT(newState(QAbstractSocket::SocketState)));
        qDebug("Client()");
    }
    ~Client() { qDebug("~Client()"); }
signals:
    void done();
public slots:
    void newConnection(int descriptor) {
        socket->setSocketDescriptor(descriptor);
    }
private slots:
    void newData() {
        QByteArray data = socket->readAll();
        if (0) qDebug("got %d bytes", data.size());
        if (0) qDebug("got a string %s", data.constData());
        // here we can process the data
    }
    void newState(QAbstractSocket::SocketState state) {
        qDebug("client new state %d", state);
        if (state == QAbstractSocket::UnconnectedState) { emit done(); }
    }
protected:
    QTcpSocket* socket;
    int descriptor;
};

// Connects to a client and sends data to it
class Sender : public QObject
{
    Q_OBJECT
public:
    Sender(const QString & address, quint16 port, QObject * parent = 0) :
        QObject(parent), socket(new QTcpSocket(this)),
        bytesInFlight(0), maxBytesInFlight(65536*8)
    {
        connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
                SLOT(newState(QAbstractSocket::SocketState)));
        connect(socket, SIGNAL(bytesWritten(qint64)), SLOT(sentData(qint64)));
        socket->connectToHost(address, port);
        qDebug("Sender()");
    }
    ~Sender() { qDebug("~Sender()"); }
protected:
    // sends enough data to keep a maximum number of bytes in flight
    void sendData() {
        qint64 n = maxBytesInFlight - bytesInFlight;
        if (n <= 0) return;
        bytesInFlight += n;
        socket->write(QByteArray(n, 44)); // 44 is the answer, after all
    }
protected slots:
    void sentData(qint64 n) {
        bytesInFlight -= n;
        Q_ASSERT(bytesInFlight >= 0);
        sendData();
    }
    void newState(QAbstractSocket::SocketState state) {
        qDebug("sender new state %d", state);
        if (state == QAbstractSocket::ConnectedState) sendData();
    }
protected:
    QTcpSocket* socket;
    qint64 bytesInFlight;
    qint64 maxBytesInFlight;
};

// Keeps track of threads and client objects
class ThreadPool : public QTcpServer
{
    Q_OBJECT
public:
    ThreadPool(QObject* parent = 0) : QTcpServer(parent), nextThread(0) {
        for (int i=0; i < QThread::idealThreadCount(); ++i) {
            QThread * thread = new QThread(this);
            connect(thread, SIGNAL(finished()), SLOT(threadDone()));
            thread->start();
            threads << thread;
        }
        const QMetaObject & mo = Client::staticMetaObject;
        int idx = mo.indexOfMethod("newConnection(int)");
        Q_ASSERT(idx>=0);
        method = mo.method(idx);
    }
    void poolObject(QObject* obj) const {
        if (nextThread >= threads.count()) nextThread = 0;
        QThread* thread = threads.at(nextThread);
        obj->moveToThread(thread);
    }
protected:
    void incomingConnection(int descriptor) {
        Client * client;
        if (threads.isEmpty()) return;
        if (! clients.isEmpty()) {
            client = clients.dequeue();
        } else {
            client = new Client();
            connect(client, SIGNAL(done()), SLOT(clientDone()));
        }
        poolObject(client);
        method.invoke(client, Q_ARG(int, descriptor));
    }
signals:
    void finished();
public slots:
    void quit() {
        foreach (QThread * thread, threads) { thread->quit(); }
    }
private slots:
    void clientDone() {
        clients.removeAll(qobject_cast<Client*>(sender()));
    }
    void threadDone() {
        QThread * thread = qobject_cast<QThread*>(sender());
        if (threads.removeAll(thread)) delete thread;
        if (threads.isEmpty()) emit finished();
    }
private:
    QList<QThread*> threads;
    QQueue<Client*> clients;
    QMetaMethod method;
    mutable int nextThread;
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    ThreadPool server;
    if (!server.listen(QHostAddress::Any, 1101)) qCritical("cannot establish a listening server");
    const int senderCount = 10;
    Sender *prevSender = 0, *firstSender = 0;
    for (int i = 0; i < senderCount; ++ i) {
        Sender * sender = new Sender("localhost", server.serverPort());
        server.poolObject(sender);
        if (!firstSender) firstSender = sender;
        if (prevSender) sender->connect(prevSender, SIGNAL(destroyed()), SLOT(deleteLater()));
        prevSender = sender;
    }
    QTimer::singleShot(3000, firstSender, SLOT(deleteLater())); // run for 3s
    server.connect(prevSender, SIGNAL(destroyed()), SLOT(quit()));
    qApp->connect(&server, SIGNAL(finished()), SLOT(quit()));
    // Deletion chain: timeout deletes first sender, then subsequent senders are deleted,
    // finally the last sender tells the thread pool to quit. Finally, the thread pool
    // quits the application.
    return a.exec();
}

#include "main.moc"

根据您的解释,您的游戏引擎将启动并创建与 localhost 上某个端口的连接。您的 Qt 程序应该在端口 1101 上接受该连接,接收一些字符串,处理它们,然后将它们发回。

修改代码以接受固定端口号上的连接。所有的数据处理,包括发回响应,都必须从newData()插槽中完成。如果您的计算非常复杂,您还可以将该数据传递给不同的线程。我所说的复杂是指数以万计的操作,例如加法和乘法,或数以千计的三角操作。

Sender课程只是作为示例。当然,您的游戏引擎会执行发送,因此您不需要 Sender 类。

于 2012-06-11T11:21:20.697 回答
0

我让我的旧“错误的方法”代码工作。我猜这部分是错误所在:

//removed
tcpSocket.disconnectFromHost();
tcpSocket.waitForDisconnected();
emit finished();

...

#include <stdlib.h>
#include <QtNetwork>
#include "MeshServer.hh"
#include <iostream>
#include "TableView.hh"

using namespace  std;

FortuneServer::FortuneServer(QObject *parent)
: QTcpServer(parent)
{

}

void FortuneServer::procesServerString(string serverString){

emit procesServerStringToParent(serverString);

}
void FortuneServer::getStringToThread(string serverString){

emit getStringToThreadSignal(serverString);

}

void FortuneServer::incomingConnection(int socketDescriptor)
{


FortuneThread *serverthread = new FortuneThread(socketDescriptor, this);
//connect(&serverthread, SIGNAL(finished()), &serverthread, SLOT(deleteLater()));


QThread* thread = new QThread;

serverthread->moveToThread(thread);




connect(serverthread, SIGNAL(fromThreadString(string)), this, SLOT(procesServerString(string)));
connect(this, SIGNAL(getStringToThreadSignal(string)), serverthread, SLOT(sendString(string)));
connect(serverthread, SIGNAL(finished()), thread, SLOT(quit()));
connect(serverthread, SIGNAL(finished()), serverthread, SLOT(deleteLater()));
connect(serverthread, SIGNAL(finished()), thread, SLOT(deleteLater()));

thread->start();

}



FortuneThread::FortuneThread(int socketDescriptor, QObject *parent): QObject(parent),   socketDescriptor(socketDescriptor)

{
if (!tcpSocket.setSocketDescriptor(socketDescriptor)) {
    emit error(tcpSocket.error());
    cout<<"socket error"<<endl;
    emit finished();
    return;
}




connect(&tcpSocket, SIGNAL(readyRead()), this, SLOT(getString()));
//connect(&tcpSocket, SIGNAL(disconnected()), this, SLOT(ondisconnected()));

}

void FortuneThread::getString()
{

int joj = tcpSocket.bytesAvailable();
if(joj==0){
    tcpSocket.disconnectFromHost();
    emit finished();
    return;
}
char inbuffer[1024];
int buffer_len = 1024;
memset(inbuffer, '\0', buffer_len);
tcpSocket.read(inbuffer,1024);
string instring;
instring = inbuffer;
instring.resize(joj);

emit fromThreadString(instring);

}   


void FortuneThread::sendString(string sendoutString)
{   
char buffer2[1024];
int buffer_len = 1024;
memset(buffer2, '\0', buffer_len);
strcat(buffer2,sendoutString.c_str());

tcpSocket.write(buffer2,buffer_len);

 }

void FortuneThread::ondisconnected()
{


emit finished();


}
于 2012-06-13T15:25:56.757 回答