3

背景

我目前正在开发一个类似于 Windows 的 select 函数,它不仅支持 SOCKET 句柄,还支持其他类型的可等待句柄。我的目标是等待标准控制台句柄,以便为curl testsuite提供选择功能。

相关程序可以在 curl git 仓库中找到:sockfilt.c

问题

是否可以在非基于 GUI 的控制台输入上等待数据可用?问题是WaitFor*方法不支持 PIPE 句柄,因此如果进程输入来自另一个进程,例如使用管道 | 则不支持 STDIN。cmd的功能。


以下示例程序说明了该问题:select_ws.c

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#include <windows.h>
#include <winsock2.h>
#include <malloc.h>

#include <conio.h>
#include <fcntl.h>

#define SET_SOCKERRNO(x)  (WSASetLastError((int)(x)))

typedef SOCKET curl_socket_t;

/*
 * select function with support for WINSOCK2 sockets and all
 * other handle types supported by WaitForMultipleObjectsEx.
 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms687028.aspx
 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms741572.aspx
 */
static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
                     fd_set *exceptfds, struct timeval *timeout)
{
  long networkevents;
  DWORD milliseconds, wait, idx, avail, events, inputs;
  WSAEVENT wsaevent, *wsaevents;
  WSANETWORKEVENTS wsanetevents;
  INPUT_RECORD *inputrecords;
  HANDLE handle, *handles;
  curl_socket_t sock, *fdarr, *wsasocks;
  int error, fds;
  DWORD nfd = 0, wsa = 0;
  int ret = 0;

  /* check if the input value is valid */
  if(nfds < 0) {
    SET_SOCKERRNO(EINVAL);
    return -1;
  }

  /* check if we got descriptors, sleep in case we got none */
  if(!nfds) {
    Sleep((timeout->tv_sec * 1000) + (timeout->tv_usec / 1000));
    return 0;
  }

  /* allocate internal array for the original input handles */
  fdarr = malloc(nfds * sizeof(curl_socket_t));
  if(fdarr == NULL) {
    SET_SOCKERRNO(ENOMEM);
    return -1;
  }

  /* allocate internal array for the internal event handles */
  handles = malloc(nfds * sizeof(HANDLE));
  if(handles == NULL) {
    SET_SOCKERRNO(ENOMEM);
    return -1;
  }

  /* allocate internal array for the internal socket handles */
  wsasocks = malloc(nfds * sizeof(curl_socket_t));
  if(wsasocks == NULL) {
    SET_SOCKERRNO(ENOMEM);
    return -1;
  }

  /* allocate internal array for the internal WINSOCK2 events */
  wsaevents = malloc(nfds * sizeof(WSAEVENT));
  if(wsaevents == NULL) {
    SET_SOCKERRNO(ENOMEM);
    return -1;
  }

  /* loop over the handles in the input descriptor sets */
  for(fds = 0; fds < nfds; fds++) {
    networkevents = 0;
    handles[nfd] = 0;

    if(FD_ISSET(fds, readfds))
      networkevents |= FD_READ|FD_ACCEPT|FD_CLOSE;

    if(FD_ISSET(fds, writefds))
      networkevents |= FD_WRITE|FD_CONNECT;

    if(FD_ISSET(fds, exceptfds))
      networkevents |= FD_OOB;

    /* only wait for events for which we actually care */
    if(networkevents) {
      fdarr[nfd] = (curl_socket_t)fds;
      if(fds == fileno(stdin)) {
        handles[nfd] = GetStdHandle(STD_INPUT_HANDLE);
      }
      else if(fds == fileno(stdout)) {
        handles[nfd] = GetStdHandle(STD_OUTPUT_HANDLE);
      }
      else if(fds == fileno(stderr)) {
        handles[nfd] = GetStdHandle(STD_ERROR_HANDLE);
      }
      else {
        wsaevent = WSACreateEvent();
        if(wsaevent != WSA_INVALID_EVENT) {
          error = WSAEventSelect(fds, wsaevent, networkevents);
          if(error != SOCKET_ERROR) {
            handles[nfd] = wsaevent;
            wsasocks[wsa] = (curl_socket_t)fds;
            wsaevents[wsa] = wsaevent;
            wsa++;
          }
          else {
            handles[nfd] = (HANDLE)fds;
            WSACloseEvent(wsaevent);
          }
        }
      }
      nfd++;
    }
  }

  /* convert struct timeval to milliseconds */
  if(timeout) {
    milliseconds = ((timeout->tv_sec * 1000) + (timeout->tv_usec / 1000));
  }
  else {
    milliseconds = INFINITE;
  }

  /* wait for one of the internal handles to trigger */
  wait = WaitForMultipleObjectsEx(nfd, handles, FALSE, milliseconds, FALSE);

  /* loop over the internal handles returned in the descriptors */
  for(idx = 0; idx < nfd; idx++) {
    fds = fdarr[idx];
    handle = handles[idx];
    sock = (curl_socket_t)fds;

    /* check if the current internal handle was triggered */
    if(wait != WAIT_FAILED && (wait - WAIT_OBJECT_0) >= idx &&
       WaitForSingleObjectEx(handle, 0, FALSE) == WAIT_OBJECT_0) {
      /* try to handle the event with STD* handle functions */
      if(fds == fileno(stdin)) {
        /* check if there is no data in the input buffer */
        if(!stdin->_cnt) {
          /* check if we are getting data from a PIPE */
          if(!GetConsoleMode(handle, &avail)) {
            /* check if there is no data from PIPE input */
            if(!PeekNamedPipe(handle, NULL, 0, NULL, &avail, NULL))
              avail = 0;
            if(!avail)
              FD_CLR(sock, readfds);
          } /* check if there is no data from keyboard input */
          else if (!_kbhit()) {
            /* check if there are INPUT_RECORDs in the input buffer */
            if(GetNumberOfConsoleInputEvents(handle, &events)) {
              if(events > 0) {
                /* remove INPUT_RECORDs from the input buffer */
                inputrecords = (INPUT_RECORD*)malloc(events *
                                                     sizeof(INPUT_RECORD));
                if(inputrecords) {
                  if(!ReadConsoleInput(handle, inputrecords,
                                       events, &inputs))
                    inputs = 0;
                  free(inputrecords);
                }

                /* check if we got all inputs, otherwise clear buffer */
                if(events != inputs)
                  FlushConsoleInputBuffer(handle);
              }
            }

            /* remove from descriptor set since there is no real data */
            FD_CLR(sock, readfds);
          }
        }

        /* stdin is never ready for write or exceptional */
        FD_CLR(sock, writefds);
        FD_CLR(sock, exceptfds);
      }
      else if(fds == fileno(stdout) || fds == fileno(stderr)) {
        /* stdout and stderr are never ready for read or exceptional */
        FD_CLR(sock, readfds);
        FD_CLR(sock, exceptfds);
      }
      else {
        /* try to handle the event with the WINSOCK2 functions */
        error = WSAEnumNetworkEvents(fds, NULL, &wsanetevents);
        if(error != SOCKET_ERROR) {
          /* remove from descriptor set if not ready for read/accept/close */
          if(!(wsanetevents.lNetworkEvents & (FD_READ|FD_ACCEPT|FD_CLOSE)))
            FD_CLR(sock, readfds);

          /* remove from descriptor set if not ready for write/connect */
          if(!(wsanetevents.lNetworkEvents & (FD_WRITE|FD_CONNECT)))
            FD_CLR(sock, writefds);

          /* remove from descriptor set if not exceptional */
          if(!(wsanetevents.lNetworkEvents & FD_OOB))
            FD_CLR(sock, exceptfds);
        }
      }

      /* check if the event has not been filtered using specific tests */
      if(FD_ISSET(sock, readfds) || FD_ISSET(sock, writefds) ||
         FD_ISSET(sock, exceptfds)) {
        ret++;
      }
    }
    else {
      /* remove from all descriptor sets since this handle did not trigger */
      FD_CLR(sock, readfds);
      FD_CLR(sock, writefds);
      FD_CLR(sock, exceptfds);
    }
  }

  for(idx = 0; idx < wsa; idx++) {
    WSAEventSelect(wsasocks[idx], NULL, 0);
    WSACloseEvent(wsaevents[idx]);
  }

  free(wsaevents);
  free(wsasocks);
  free(handles);
  free(fdarr);

  return ret;
}

int main(void)
{
  WORD wVersionRequested;
  WSADATA wsaData;
  SOCKET sock[4];
  struct sockaddr_in sockaddr[4];
  fd_set readfds;
  fd_set writefds;
  fd_set exceptfds;
  SOCKET maxfd = 0;
  int selfd = 0;
  void *buffer = malloc(1024);
  ssize_t nread;

  setmode(fileno(stdin), O_BINARY);

  wVersionRequested = MAKEWORD(2, 2);
  WSAStartup(wVersionRequested, &wsaData);

  sock[0] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  sockaddr[0].sin_family = AF_INET;
  sockaddr[0].sin_addr.s_addr = inet_addr("74.125.134.26");
  sockaddr[0].sin_port = htons(25);
  connect(sock[0], (struct sockaddr *) &sockaddr[0], sizeof(sockaddr[0]));

  sock[1] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  sockaddr[1].sin_family = AF_INET;
  sockaddr[1].sin_addr.s_addr = inet_addr("74.125.134.27");
  sockaddr[1].sin_port = htons(25);
  connect(sock[1], (struct sockaddr *) &sockaddr[1], sizeof(sockaddr[1]));

  sock[2] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  sockaddr[2].sin_family = AF_INET;
  sockaddr[2].sin_addr.s_addr = inet_addr("127.0.0.1");
  sockaddr[2].sin_port = htons(1337);
  printf("bind = %d\n", bind(sock[2], (struct sockaddr *) &sockaddr[2], sizeof(sockaddr[2])));
  printf("listen = %d\n", listen(sock[2], 5));

  sock[3] = INVALID_SOCKET;

  while(1) {
    FD_ZERO(&readfds);
    FD_ZERO(&writefds);
    FD_ZERO(&exceptfds);

    FD_SET(sock[0], &readfds);
    FD_SET(sock[0], &exceptfds);
    maxfd = maxfd > sock[0] ? maxfd : sock[0];

    FD_SET(sock[1], &readfds);
    FD_SET(sock[1], &exceptfds);
    maxfd = maxfd > sock[1] ? maxfd : sock[1];

    FD_SET(sock[2], &readfds);
    FD_SET(sock[2], &exceptfds);
    maxfd = maxfd > sock[2] ? maxfd : sock[2];

    FD_SET((SOCKET)fileno(stdin), &readfds);
    maxfd = maxfd > (SOCKET)fileno(stdin) ? maxfd : (SOCKET)fileno(stdin);

    printf("maxfd = %d\n", maxfd);
    selfd = select_ws(maxfd + 1, &readfds, &writefds, &exceptfds, NULL);
    printf("selfd = %d\n", selfd);

    if(FD_ISSET(sock[0], &readfds)) {
      printf("read sock[0]\n");
      nread = recv(sock[0], buffer, 1024, 0);
      printf("read sock[0] = %d\n", nread);
    }
    if(FD_ISSET(sock[0], &exceptfds)) {
      printf("exception sock[0]\n");
    }

    if(FD_ISSET(sock[1], &readfds)) {
      printf("read sock[1]\n");
      nread = recv(sock[1], buffer, 1024, 0);
      printf("read sock[1] = %d\n", nread);
    }
    if(FD_ISSET(sock[1], &exceptfds)) {
      printf("exception sock[1]\n");
    }

    if(FD_ISSET(sock[2], &readfds)) {
      if(sock[3] != INVALID_SOCKET)
        closesocket(sock[3]);

      printf("accept sock[2] = %d\n", sock[2]);
      nread = sizeof(sockaddr[3]);
      printf("WSAGetLastError = %d\n", WSAGetLastError());
      sock[3] = accept(sock[2], (struct sockaddr *) &sockaddr[3], &nread);
      printf("WSAGetLastError = %d\n", WSAGetLastError());
      printf("accept sock[2] = %d\n", sock[3]);
    }
    if(FD_ISSET(sock[2], &exceptfds)) {
      printf("exception sock[2]\n");
    }

    if(FD_ISSET(fileno(stdin), &readfds)) {
      printf("read fileno(stdin)\n");
      nread = read(fileno(stdin), buffer, 1024);
      printf("read fileno(stdin) = %d\n", nread);
    }
  }

  WSACleanup();
  free(buffer);
}

通过以下命令使用 MinGW 进行编译:

mingw32-gcc select_ws.c -Wl,-lws2_32 -g -o select_ws.exe

使用以下命令直接从控制台运行程序有效:

select_ws.exe

但是对管道做同样的事情会不断地向 WaitForMultipleObjectsEx 发出信号:

ping -t 8.8.8.8 | select_ws.exe

在父进程完成之前,管道已准备好读取,例如:

ping 8.8.8.8 | select_ws.exe

是否有兼容的方法来模拟基于 PIPE 的控制台输入句柄与其他句柄的阻塞等待?应避免使用螺纹。

欢迎您对本要点中的示例程序做出更改。

提前致谢!

4

2 回答 2

1

用于GetStdHandle(STD_INPUT_HANDLE)获取 STDIN 管道句柄,然后使用其成员设置为手动重置事件的ReadFile/Ex()结构。然后,您可以使用任何函数来等待事件。如果超时,调用以中止读取操作。OVERLAPPEDhEventCreateEvent()WaitFor*()CancelIo()

于 2013-01-07T22:14:33.953 回答
1

我实际上找到了一种使用单独的等待线程使其工作的方法。请参阅 github.com 上 curl 存储库中的以下提交

感谢您的意见!

于 2014-01-26T00:14:35.563 回答