1

我有两个用管道连接的进程。

一个进程有多个线程在管道上写入消息。

另一个进程读取管道并处理消息。

问题是,当进程读取管道时,它会一个接一个地获取所有消息。¿ 有没有办法一次只阅读一条消息?

起初我使用写和读函数,直接使用文件描述符。然后我尝试将它们视为文件,使用 fdopen、fread 和 fwrite,但它仍然同时读取所有数据。

消息的大小每次都会改变,所以我无法通过读取固定数量的字符来修复它。

4

1 回答 1

1

很久以前,在 POSIX 成为世界上任何地方的已知概念之前,至少有一个版本的 Unix 维护这样的东西:写入小于管道缓冲区中剩余空间大小的内容以对应的原子块读取写入管道的数据包的大小,受您尝试读取足够数据的约束。不幸的是(或者我的意思是“显然”),我无法再证明情况确实如此——我已经有超过 25 年的时间无法访问相关的硬件和 O/S。

然而,反例证明表明,Mac OS X 不再像那样处理读取端(尽管write()如果写入的请求大小足够小,POSIX 确实保证调用是原子的)。这对我来说是一个惊喜。

反例——代码

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

static void childish(int fd)
{
    char buffer[1024];
    int  nbytes;
    int  pid = getpid();
    while ((nbytes = read(fd, buffer, sizeof(buffer))) > 0)
    {
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, buffer);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

static void parental(int fd)
{
    char message[] = "The Quick Brown Fox Jumped Over The Lazy Dog";
    int  pid = getpid();
    for (int i = 0; i < 20; i++)
    {
        int  nbytes = rand() % (sizeof(message) - 1);
        while (nbytes == 0)
            nbytes = rand() % (sizeof(message) - 1);
        if (write(fd, message, nbytes) != nbytes)
            break;
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, message);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

int main(void)
{
    int fd[2];
    pipe(fd);
    pid_t pid = fork();
    if (pid < 0)
        fprintf(stderr, "failed to fork\n");
    else if (pid == 0)
    {
        close(fd[1]);
        childish(fd[0]);
    }
    else
    {
        close(fd[0]);
        parental(fd[1]);
    }
    return EXIT_FAILURE;  // Failed to fork
}

反例——数据

在 Mac OS X 10.8.3 上测试。

86504: 0043 <<The Quick Brown Fox Jumped Over The Lazy Do>>
86504: 0001 <<T>>
86504: 0033 <<The Quick Brown Fox Jumped Over T>>
86504: 0006 <<The Qu>>
86504: 0030 <<The Quick Brown Fox Jumped Ove>>
86504: 0036 <<The Quick Brown Fox Jumped Over The >>
86504: 0024 <<The Quick Brown Fox Jump>>
86504: 0022 <<The Quick Brown Fox Ju>>
86504: 0031 <<The Quick Brown Fox Jumped Over>>
86504: 0037 <<The Quick Brown Fox Jumped Over The L>>
86504: 0028 <<The Quick Brown Fox Jumped O>>
86504: 0017 <<The Quick Brown F>>
86504: 0032 <<The Quick Brown Fox Jumped Over >>
86504: 0038 <<The Quick Brown Fox Jumped Over The La>>
86504: 0019 <<The Quick Brown Fox>>
86504: 0007 <<The Qui>>
86504: 0023 <<The Quick Brown Fox Jum>>
86504: 0005 <<The Q>>
86504: 0020 <<The Quick Brown Fox >>
86504: 0004 <<The >>
86504: exiting
86505: 0456 <<The Quick Brown Fox Jumped Over The Lazy DoTThe Quick Brown Fox Jumped Over TThe QuThe Quick Brown Fox Jumped OveThe Quick Brown Fox Jumped Over The The Quick Brown Fox JumpThe Quick Brown Fox JuThe Quick Brown Fox Jumped OverThe Quick Brown Fox Jumped Over The LThe Quick Brown Fox Jumped OThe Quick Brown FThe Quick Brown Fox Jumped Over The Quick Brown Fox Jumped Over The LaThe Quick Brown FoxThe QuiThe Quick Brown Fox JumThe QThe Quick Brown Fox The >>
86505: exiting

原子写作——非原子阅读

此代码用于writev()将消息长度和消息写入管道。当然,它使用两次读取来获取数据,获取长度,然后获取消息。这适用于单个阅读器;对于多个阅读器,您必须在阅读器之间进行协调,以便一个阅读器不访问文件描述符,而另一个阅读器读取了长度但不读取数据。该代码使用writev()系统调用在单个系统调用中写入长度和数据。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/uio.h>
#include <unistd.h>

static void childish(int fd)
{
    int  nbytes;
    int  pid = getpid();
    while (read(fd, &nbytes, sizeof(nbytes)) == sizeof(nbytes))
    {
        char buffer[1024];
        int  actual;
        if ((actual = read(fd, buffer, nbytes)) != nbytes)
        {
            fprintf(stderr, "%.5d: short read (wanted %d, actual %d)\n", pid, nbytes, actual);
            break;
        }
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, buffer);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

static void parental(int fd)
{
    char message[] = "The Quick Brown Fox Jumped Over The Lazy Dog";
    int nbytes = 0;
    struct iovec req[2];
    req[0].iov_base = &nbytes;
    req[0].iov_len  = sizeof(nbytes);
    req[1].iov_base = message;
    req[1].iov_len  = 0;
    int  pid = getpid();
    for (int i = 0; i < 20; i++)
    {
        do
        {
            nbytes = rand() % (sizeof(message) - 1);
        } while (nbytes == 0);
        req[1].iov_len = nbytes;
        if (writev(fd, req, 2) != (int)(nbytes + sizeof(nbytes)))
            break;
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, message);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

int main(void)
{
    int fd[2];
    pipe(fd);
    pid_t pid = fork();
    if (pid < 0)
        fprintf(stderr, "failed to fork\n");
    else if (pid == 0)
    {
        close(fd[1]);
        childish(fd[0]);
    }
    else
    {
        close(fd[0]);
        parental(fd[1]);
    }
    return EXIT_FAILURE;  // Failed to fork
}

示例输出

86798: 0043 <<The Quick Brown Fox Jumped Over The Lazy Do>>
86798: 0001 <<T>>
86798: 0033 <<The Quick Brown Fox Jumped Over T>>
86798: 0006 <<The Qu>>
86798: 0030 <<The Quick Brown Fox Jumped Ove>>
86798: 0036 <<The Quick Brown Fox Jumped Over The >>
86798: 0024 <<The Quick Brown Fox Jump>>
86798: 0022 <<The Quick Brown Fox Ju>>
86798: 0031 <<The Quick Brown Fox Jumped Over>>
86798: 0037 <<The Quick Brown Fox Jumped Over The L>>
86798: 0028 <<The Quick Brown Fox Jumped O>>
86798: 0017 <<The Quick Brown F>>
86798: 0032 <<The Quick Brown Fox Jumped Over >>
86798: 0038 <<The Quick Brown Fox Jumped Over The La>>
86798: 0019 <<The Quick Brown Fox>>
86798: 0007 <<The Qui>>
86798: 0023 <<The Quick Brown Fox Jum>>
86798: 0005 <<The Q>>
86798: 0020 <<The Quick Brown Fox >>
86798: 0004 <<The >>
86798: exiting
86799: 0043 <<The Quick Brown Fox Jumped Over The Lazy Do>>
86799: 0001 <<T>>
86799: 0033 <<The Quick Brown Fox Jumped Over T>>
86799: 0006 <<The Qu>>
86799: 0030 <<The Quick Brown Fox Jumped Ove>>
86799: 0036 <<The Quick Brown Fox Jumped Over The >>
86799: 0024 <<The Quick Brown Fox Jump>>
86799: 0022 <<The Quick Brown Fox Ju>>
86799: 0031 <<The Quick Brown Fox Jumped Over>>
86799: 0037 <<The Quick Brown Fox Jumped Over The L>>
86799: 0028 <<The Quick Brown Fox Jumped O>>
86799: 0017 <<The Quick Brown F>>
86799: 0032 <<The Quick Brown Fox Jumped Over >>
86799: 0038 <<The Quick Brown Fox Jumped Over The La>>
86799: 0019 <<The Quick Brown Fox>>
86799: 0007 <<The Qui>>
86799: 0023 <<The Quick Brown Fox Jum>>
86799: 0005 <<The Q>>
86799: 0020 <<The Quick Brown Fox >>
86799: 0004 <<The >>
86799: exiting

我对这个例子中的进程严格连续运行感到失望。由于这是多核机器,这不是我所期望的。当我将循环限制从 20 更改为 2000 时,我得到了交错执行,并且数据在发送和接收端保持同步。

我使用 4 字节int值作为长度。显然,对于手头的数据,使用 1 字节就足够了unsigned char(哎呀,它甚至可以是 a,signed char因为字符串只有 44 个字符长)。

请注意,我没有播种rand()生成器,因此除了每次运行时的进程 ID 之外,输出都是确定性的。

还要注意,通过阅读 POSIX 规范,我并不能绝对确定这些writev()段是否保证被视为管道上的单个单元。如果不是这种情况,那么您将需要在parental()代码中创建一个缓冲区,其中包含长度和相关数据量,然后使用普通write()调用。这一点也不难做到:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

static void childish(int fd)
{
    char nbytes;
    int  pid = getpid();
    while (read(fd, &nbytes, sizeof(nbytes)) == sizeof(nbytes))
    {
        char buffer[1024];
        int  actual;
        if ((actual = read(fd, buffer, nbytes)) != nbytes)
        {
            fprintf(stderr, "%.5d: short read (wanted %d, actual %d)\n", pid, nbytes, actual);
            break;
        }
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, buffer);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

static void parental(int fd)
{
    char message[] = "\000The Quick Brown Fox Jumped Over The Lazy Dog";
    int  pid = getpid();
    for (int i = 0; i < 20; i++)
    {
        int nbytes;
        do
        {
            nbytes = rand() % (sizeof(message) - 1);
        } while (nbytes == 0);
        message[0] = nbytes;
        if (write(fd, message, nbytes + 1) != (nbytes + 1))
            break;
        printf("%.5d: %.4d <<%.*s>>\n", pid, nbytes, nbytes, message+1);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

int main(void)
{
    int fd[2];
    pipe(fd);
    pid_t pid = fork();
    if (pid < 0)
        fprintf(stderr, "failed to fork\n");
    else if (pid == 0)
    {
        close(fd[1]);
        childish(fd[0]);
    }
    else
    {
        close(fd[0]);
        parental(fd[1]);
    }
    return EXIT_FAILURE;  // Failed to fork
}

在编写代码中添加线程

创建线程来进行写作并不是那么难。代码仍然使用rand(),但rand()不能保证是线程安全的,所以它可能没有你想要的那么好。另一方面,这段代码只是rand()用来生成可变大小的消息;它是否完美运行并不重要。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

static void childish(int fd)
{
    char nbytes;
    int  pid = getpid();
    while (read(fd, &nbytes, sizeof(nbytes)) == sizeof(nbytes))
    {
        char buffer[1024];
        int  actual;
        if ((actual = read(fd, buffer, nbytes)) != nbytes)
        {
            fprintf(stderr, "%.5d: short read (wanted %d, actual %d)\n", pid, nbytes, actual);
            break;
        }
        printf("%.5d: %.4d R <<%.*s>>\n", pid, nbytes, nbytes, buffer);
        fflush(stdout);
    }
    printf("%.5d: exiting\n", pid);
    exit(0);
}

static void *p_thread(void *data)
{
    int fd = *(int *)data;
    char message[] = "\000The Quick Brown Fox Jumped Over The Lazy Dog";
    int  pid = getpid();
    for (int i = 0; i < 20; i++)
    {
        int nbytes;
        do
        {
            nbytes = rand() % (sizeof(message) - 1);
        } while (nbytes == 0);
        message[0] = nbytes;
        if (write(fd, message, nbytes + 1) != (nbytes + 1))
            break;
        printf("%.5d: %.4d W <<%.*s>>\n", pid, nbytes, nbytes, message+1);
        fflush(stdout);
    }
    printf("%.5d: thread exiting\n", pid);
    return(0);
}

static void parental(int fd)
{
    enum { NUM_THREADS = 3 };
    pthread_t thr[NUM_THREADS];
    int  pid = getpid();
    for (int i = 0; i < NUM_THREADS; i++)
    {
        if (pthread_create(&thr[i], 0, p_thread, (void *)&fd) != 0)
        {
            fprintf(stderr, "%.5d: failed to create thread number %d\n", pid, i);
            exit(EXIT_FAILURE);
        }
    }
    for (int i = 0; i < NUM_THREADS; i++)
    {
        if (pthread_join(thr[i], 0) != 0)
        {
            fprintf(stderr, "%.5d: failed to join thread number %d\n", pid, i);
            exit(EXIT_FAILURE);
        }
    }
    printf("%.5d: master thread exiting\n", pid);
    exit(EXIT_SUCCESS);
}

int main(void)
{
    int fd[2];
    pipe(fd);
    pid_t pid = fork();
    if (pid < 0)
        fprintf(stderr, "failed to fork\n");
    else if (pid == 0)
    {
        close(fd[1]);
        childish(fd[0]);
    }
    else
    {
        close(fd[0]);
        parental(fd[1]);
    }
    return EXIT_FAILURE;  // Failed to fork
}

请注意p_thread(),线程函数几乎是前一个parental函数的副本,但新parental()函数协调了三个线程的创建和终止。childish()和中的代码main()根本不需要更改(尽管我R在打印中添加了childish()以匹配W中的代码p_thread())。

于 2013-05-12T18:56:02.673 回答