4

在我的一个解析程序中,我必须执行以下操作(在 C、C++、VC++ 中):-

我必须通过套接字编程连接 100 台机器(透析机)。向每台机器发送请求并接收来自机器的响应以及我将从机器获得的数据我必须解析该数据并将值写入文件中。我必须每 5 秒后向每台机器发送请求.

所以为了完成上述任务,我想这样做: -

我将从数据库中读取每台机器的ip和端口,创建一个线程连接到每台机器,并在每个线程中创建一个子线程(子线程),它将发送和接收并解析来自机器的数据每 5 秒(并将值写入 txt 文件)。我的解析功能将是通用的。

是否可行的解决方案。请帮助我。提前谢谢。

4

3 回答 3

4

您的解决方案听起来很合理,除了一点。您提到您将创建一个线程来连接到每台机器,然后创建一个子线程来管理发送、接收和解析。我不明白你为什么需要创建一个子线程。您应该能够处理连接线程中的所有内容。还要考虑每个连接 1 个线程可能无法很好地扩展,如果此应用程序必须处理大量机器,则应避免每台机器一个线程。

甚至可以使用简单的线程池而不是每个连接 1 个线程来实现这一点,这不会很好地扩展。您可以考虑创建每 5 秒放入工作队列的任务,线程池将连接、读取、断开连接、解析和处理。假设这是 TCP/IP,您可能不应该保持连接打开,而是每次读取都连接/断开连接,类似于 HTTP。

是一个 vc++ 线程池相关的问题。这里有一些更相关的信息

另一种选择是使用libevent进行套接字通信。至于解析,还有其他库可以使用,例如Apache ThriftJSon,它们都是开源的。这些解析库的缺点是您可能还必须修改透析机,这可能不是一个选项。如果您可以使用 Thrift 之类的东西,您可以从一个库中获取所有内容:套接字通信和解析。

以下是每个连接 1 个线程的简单情况的一些代码:

class ThreadInfo
{
public:
  ThreadInfo(const string &ipAddress, uint16_t port) : ipAddress_(ipAddress), port_(port) {}
  string getIpAddress() {return ipAddress_;}
  uint16_t getPort() {return port_;}
  string getRecvBuffer() {return recvBuffer_;}

private:
  string ipAddress_;
  uint16_t port_;
  string recvBuffer_;
};

void *threadEntryPoint(void *userData)
{
  ThreadInfo *threadInfo = (ThreadInfo*) userData;

  // You need to decide if you want to keep the connection open while sleeping
  // or open and close it for each transaction. Change code here accordingly.
  // Create socket with threadInfo->getIpAddress() and threadInfo->getPort()

  // while(1)
  //   Send request to each machine
  //   Get response from each machine and store in threadInfo->getRecvBuffer()
  //       The buffer could also be a local var in this function, decide accordingly
  //   parse data accordingly
  //   sleep 5 seconds
}

uint16_t getPort(int machineNum) { return 3456; }
string getIpAddress(int machineNum) { return string("192.168.1.2"); }
int main(int argc, char **argv)
{
   // 3 items that we need, and that you will have to plugin accordingly:
   //   1) Num threads, assuming 100 for now
   //   2) IP address of each external machine, implement getIpAddress() accordingly
   //   3) port of each machine, implement getPort() accordingly

   int numThreads(100);
   list<pthread_t> threadIdList;

   for(int i = 0; i < numThreads; ++i)
   {
      pthread_t threadId;
      ThreadInfo *threadInfo = new ThreadInfo(getIpAddress(i), getPort(i));
      pthread_create(&threadId, NULL, threadEntryPoint, threadInfo);
      threadIdList.push_back(threadId);
   }

   // Wait for the threads to finish
   std::list<pthread_t>::iterator iter = threadIdList.begin();
   while(iter != threadIdList.end())
   {
     pthread_t threadId = *iter++;
     pthread_join(threadId, NULL);
   }
}
于 2012-09-18T07:15:48.913 回答
1

对于 100 台机器,每 5 秒轮询一次,每台机器一个线程是合理的 - 大多数时间线程将在 Sleep(5000) 上阻塞,而在套接字 I/O 或磁盘 I/O 上阻塞大部分时间其余时间。对于这种加载(甚至是加载的五倍),我认为没有必要求助于异步 I/O 或线程池 - 为什么不必要地使事情复杂化?

正如@Brady 所指出的,我不明白为什么每个连接需要多个线程,假设您发布的要求是 - 每隔约 5 秒轮询一次并将回复写入文本文件。

我猜,(希望 :),5 秒间隔不是安全关键的实时要求,如果由于某些临时软件或网络延迟而偶尔为 6 秒,透析机将继续正常运行. 我不是血液科医生/肾脏科医生,但如果任何透析机可以对整体治疗做出任何重大改变(这需要数小时),如果投票/指示偶尔延迟一秒钟,我会感到惊讶。

编辑-重新。'解析函数并将数据写入文件函数对于所有线程都是通用的' - 应该没问题,假设每台机器都有不同的文本文件。如果所有日志都写入一个日志文件,那就更成问题了——每个日志条目实际上应该排到一个记录器线程的队列中,该线程单独写入日志文件。使用已经支持这种功能的现有的、经过验证的记录器框架将是最简单的解决方案。

于 2012-09-18T09:22:21.500 回答
0

如果你对网络和磁盘 IO 都使用boost::asio框架,你可能会得到比每台机器处理一个线程少得多的线程。

另外,它有一个很好的用于套接字编程的高级接口。

于 2012-09-18T07:44:51.867 回答