0

我有以下两个程序,一个充当读者,另一个充当作家。作者似乎只正确发送了大约 3/4 的数据以供读者阅读。有没有办法保证所有数据都被发送?我想我已经设置好它可以可靠地读取和写入,但它似乎仍然错过了 1/4 的数据。

这里是作者的来源

#define pipe "/tmp/testPipe"

using namespace std;

queue<string> sproutFeed;


ssize_t r_write(int fd, char *buf, size_t size) {
   char *bufp;
   size_t bytestowrite;
   ssize_t byteswritten;
   size_t totalbytes;

   for (bufp = buf, bytestowrite = size, totalbytes = 0;
        bytestowrite > 0;
        bufp += byteswritten, bytestowrite -= byteswritten) {
      byteswritten = write(fd, bufp, bytestowrite);
            if(errno == EPIPE)
            {
            signal(SIGPIPE,SIG_IGN);
            }
      if ((byteswritten) == -1 && (errno != EINTR))
         return -1;
      if (byteswritten == -1)
         byteswritten = 0;
      totalbytes += byteswritten;
   }
   return totalbytes;
}


void* sendData(void *thread_arg)
{

int fd, ret_val, count, numread;
string word;
char bufpipe[5];


ret_val = mkfifo(pipe, 0777); //make the sprout pipe

if (( ret_val == -1) && (errno != EEXIST)) 
{
    perror("Error creating named pipe");
    exit(1);
}   
while(1)
{
    if(!sproutFeed.empty())
    {
        string s;
        s.clear();
        s = sproutFeed.front();
        int sizeOfData = s.length();
        snprintf(bufpipe, 5, "%04d\0", sizeOfData); 
        char stringToSend[strlen(bufpipe) + sizeOfData +1];
        bzero(stringToSend, sizeof(stringToSend));                  
        strncpy(stringToSend,bufpipe, strlen(bufpipe));         
        strncat(stringToSend,s.c_str(),strlen(s.c_str()));
        strncat(stringToSend, "\0", strlen("\0"));                  
        int fullSize = strlen(stringToSend);            
        signal(SIGPIPE,SIG_IGN);

        fd = open(pipe,O_WRONLY);
        int numWrite = r_write(fd, stringToSend, strlen(stringToSend) );
        cout << errno << endl;
        if(errno == EPIPE)
        {
        signal(SIGPIPE,SIG_IGN);
        }

        if(numWrite != fullSize )
        {               
            signal(SIGPIPE,SIG_IGN);
            bzero(bufpipe, strlen(bufpipe));
            bzero(stringToSend, strlen(stringToSend));
            close(fd);
        }
        else
        {
            signal(SIGPIPE,SIG_IGN);
            sproutFeed.pop();
            close(fd);
            bzero(bufpipe, strlen(bufpipe));
            bzero(stringToSend, strlen(stringToSend));
        }                   
    }
    else
    {
        if(usleep(.0002) == -1)
        {
            perror("sleeping error\n");
        }
    }
}

}

int main(int argc, char *argv[])
{
    signal(SIGPIPE,SIG_IGN);
    int x;
    for(x = 0; x < 100; x++)
    {
        sproutFeed.push("All ships in the sea sink except for that blue one over there, that one never sinks. Most likley because it\'s blue and thats the mightiest colour of ship. Interesting huh?");
    }
    int rc, i , status;
    pthread_t threads[1];       
    printf("Starting Threads...\n");
    pthread_create(&threads[0], NULL, sendData, NULL);
    rc = pthread_join(threads[0], (void **) &status);

}

这是读者的来源

#define pipe "/tmp/testPipe"

char dataString[50000];
using namespace std;
char *getSproutItem();

void* readItem(void *thread_arg)
{
    while(1)
    {
        x++;
        char *s = getSproutItem();
        if(s != NULL)
        {
            cout << "READ IN: " << s << endl;
        }
    }
}


ssize_t r_read(int fd, char *buf, size_t size) {
   ssize_t retval;
   while (retval = read(fd, buf, size), retval == -1 && errno == EINTR) ;
   return retval;
}


char * getSproutItem()
{
    cout << "Getting item" << endl;
    char stringSize[4];
    bzero(stringSize, sizeof(stringSize));
    int fd = open(pipe,O_RDONLY);
    cout << "Reading" << endl;

    int numread = r_read(fd,stringSize, sizeof(stringSize));


    if(errno == EPIPE)
    {
        signal(SIGPIPE,SIG_IGN);

    }
    cout << "Read Complete" << endl;

    if(numread > 1)
    {

        stringSize[numread] = '\0'; 
        int length = atoi(stringSize);
        char recievedString[length];
        bzero(recievedString, sizeof(recievedString));
        int numread1 = r_read(fd, recievedString, sizeof(recievedString));
        if(errno == EPIPE)
        {


signal(SIGPIPE,SIG_IGN);
    }       
    if(numread1 > 1)
    {
        recievedString[numread1] = '\0';
        cout << "DATA RECIEVED: " << recievedString << endl;
        bzero(dataString, sizeof(dataString));
        strncpy(dataString, recievedString, strlen(recievedString));
        strncat(dataString, "\0", strlen("\0"));
        close(fd);  
        return dataString;
    }
    else
    {
        return NULL;
    }

}
else
{
    return NULL;
}

close(fd);

}

int main(int argc, char *argv[])
{
        int rc, i , status;
        pthread_t threads[1];       
        printf("Starting Threads...\n");
        pthread_create(&threads[0], NULL, readItem, NULL);
        rc = pthread_join(threads[0], (void **) &status); 

}
4

4 回答 4

4

您肯定以错误的方式使用信号。这里完全不需要线程——至少在提供的代码中是这样。字符串计算很奇怪。拿到这本书,在你读完之前不要触摸键盘:)

于 2009-05-25T04:35:54.347 回答
0

很难说这里发生了什么。也许您的系统调用之一返回了错误?您确定您已成功发送所有数据吗?

您在这里似乎也有一些无效代码:

    int length = atoi(stringSize);
    char recievedString[length];

这是一个语法错误,因为您不能使用非常量表达式在堆栈上创建数组的大小。也许您在真实版本中使用了不同的代码?

您需要循环读取数据吗?有时一个函数会返回一部分可用数据,并要求您重复调用它,直到所有数据都消失。

如果系统调用被中断,Unix 中的一些系统调用也可以返回 EAGAIN - 从事物的外观来看,您并没有处理这种情况。

于 2009-05-24T23:48:18.853 回答
0

用于通过命名管道发送数据的一般方法是在标头上附加有效负载的长度。然后你读(fd, header_len); 读取(rd,data_len);请注意,后者 read() 需要在循环中完成,直到读取 data_len 或 eof。另请注意,如果您有多个写入器到命名管道,那么写入是原子的(只要合理的大小)IE 多个写入器不会在内核缓冲区中处理部分消息。

于 2009-05-24T23:49:22.907 回答
0

您可能会被阅读器主线程中的 POSIX 线程信号处理语义所困扰。POSIX 标准允许 POSIX 线程接收信号,而不一定是您期望的线程。在不需要的地方阻止信号。 signal(SIG_PIPE,SIG_IGN)是你的朋友。向阅读器主添加一个。

POSIX 线程处理语义,将 POS 放入 POSIX。(但它确实更容易实现 POSIX 线程。)

用 ls 检查 /tmp 中的管道?不是空的吗?

于 2009-05-25T00:04:53.123 回答