您的解决方案听起来很合理,除了一点。您提到您将创建一个线程来连接到每台机器,然后创建一个子线程来管理发送、接收和解析。我不明白你为什么需要创建一个子线程。您应该能够处理连接线程中的所有内容。还要考虑每个连接 1 个线程可能无法很好地扩展,如果此应用程序必须处理大量机器,则应避免每台机器一个线程。
甚至可以使用简单的线程池而不是每个连接 1 个线程来实现这一点,这不会很好地扩展。您可以考虑创建每 5 秒放入工作队列的任务,线程池将连接、读取、断开连接、解析和处理。假设这是 TCP/IP,您可能不应该保持连接打开,而是每次读取都连接/断开连接,类似于 HTTP。
这是一个 vc++ 线程池相关的问题。这里有一些更相关的信息。
另一种选择是使用libevent进行套接字通信。至于解析,还有其他库可以使用,例如Apache Thrift或JSon,它们都是开源的。这些解析库的缺点是您可能还必须修改透析机,这可能不是一个选项。如果您可以使用 Thrift 之类的东西,您可以从一个库中获取所有内容:套接字通信和解析。
以下是每个连接 1 个线程的简单情况的一些代码:
class ThreadInfo
{
public:
ThreadInfo(const string &ipAddress, uint16_t port) : ipAddress_(ipAddress), port_(port) {}
string getIpAddress() {return ipAddress_;}
uint16_t getPort() {return port_;}
string getRecvBuffer() {return recvBuffer_;}
private:
string ipAddress_;
uint16_t port_;
string recvBuffer_;
};
void *threadEntryPoint(void *userData)
{
ThreadInfo *threadInfo = (ThreadInfo*) userData;
// You need to decide if you want to keep the connection open while sleeping
// or open and close it for each transaction. Change code here accordingly.
// Create socket with threadInfo->getIpAddress() and threadInfo->getPort()
// while(1)
// Send request to each machine
// Get response from each machine and store in threadInfo->getRecvBuffer()
// The buffer could also be a local var in this function, decide accordingly
// parse data accordingly
// sleep 5 seconds
}
uint16_t getPort(int machineNum) { return 3456; }
string getIpAddress(int machineNum) { return string("192.168.1.2"); }
int main(int argc, char **argv)
{
// 3 items that we need, and that you will have to plugin accordingly:
// 1) Num threads, assuming 100 for now
// 2) IP address of each external machine, implement getIpAddress() accordingly
// 3) port of each machine, implement getPort() accordingly
int numThreads(100);
list<pthread_t> threadIdList;
for(int i = 0; i < numThreads; ++i)
{
pthread_t threadId;
ThreadInfo *threadInfo = new ThreadInfo(getIpAddress(i), getPort(i));
pthread_create(&threadId, NULL, threadEntryPoint, threadInfo);
threadIdList.push_back(threadId);
}
// Wait for the threads to finish
std::list<pthread_t>::iterator iter = threadIdList.begin();
while(iter != threadIdList.end())
{
pthread_t threadId = *iter++;
pthread_join(threadId, NULL);
}
}