3

我想不出任何在c中实现流水线的方法,它实际上会起作用。这就是我决定在这里写的原因。我不得不说,我了解 pipe/fork/mkfifo 是如何工作的。我见过很多实现 2-3 管道的例子。这简单。我的问题开始了,当我必须实现 shell 时,管道数是未知的。

我现在所拥有的:例如。

ls -al | tr a-z A-Z | tr A-Z a-z | tr a-z A-Z

我把这条线变成了这样的东西:

array[0] = {"ls", "-al", NULL"}
array[1] = {"tr", "a-z", "A-Z", NULL"}
array[2] = {"tr", "A-Z", "a-z", NULL"}
array[3] = {"tr", "a-z", "A-Z", NULL"}

所以我可以使用

execvp(array[0],array)

稍后的。

直到现在,我相信一切都好。当我尝试将这些功能的输入/输出重定向到彼此时,问题就开始了。

我是这样做的:

    mkfifo("queue", 0777);

    for (i = 0; i<= pipelines_count; i++) // eg. if there's 3 pipelines, there's 4 functions to execvp
    {
    int b = fork();             
    if (b == 0) // child
        {           
        int c = fork();

        if (c == 0) 
        // baby (younger than child) 
        // I use c process, to unblock desc_read and desc_writ for b process only
        // nothing executes in here
            {       
            if (i == 0) // 1st pipeline
                {
                int desc_read = open("queue", O_RDONLY);
                // dup2 here, so after closing there's still something that can read from 
                // from desc_read
                dup2(desc_read, 0); 
                close(desc_read);           
                }

            if (i == pipelines_count) // last pipeline
                {
                int desc_write = open("queue", O_WRONLY);
                dup2(desc_write, 0);
                close(desc_write);                              
                }

            if (i > 0 && i < pipelines_count) // pipeline somewhere inside
                {
                int desc_read = open("queue", O_RDONLY);
                int desc_write = open("queue", O_WRONLY);
                dup2(desc_write, 1);
                dup2(desc_read, 0);
                close(desc_write);
                close(desc_read);
                }               
            exit(0); // closing every connection between process c and pipeline             
            }
        else
        // b process here
        // in b process, i execvp commands
        {                       
        if (i == 0) // 1st pipeline (changing stdout only)
            {   
            int desc_write = open("queue", O_WRONLY);               
            dup2(desc_write, 1); // changing stdout -> pdesc[1]
            close(desc_write);                  
            }

        if (i == pipelines_count) // last pipeline (changing stdin only)
            {   
            int desc_read = open("queue", O_RDONLY);                                    
            dup2(desc_read, 0); // changing stdin -> pdesc[0]   
            close(desc_read);           
            }

        if (i > 0 && i < pipelines_count) // pipeline somewhere inside
            {               
            int desc_write = open("queue", O_WRONLY);       
            dup2(desc_write, 1); // changing stdout -> pdesc[1]
            int desc_read = open("queue", O_RDONLY);                            
            dup2(desc_read, 0); // changing stdin -> pdesc[0]
            close(desc_write);
            close(desc_read);                               
            }

        wait(NULL); // it wait's until, process c is death                      
        execvp(array[0],array);         
        }
        }
    else // parent (waits for 1 sub command to be finished)
        {       
        wait(NULL);
        }       
    }

谢谢。

4

4 回答 4

6

Patryk,您为什么要使用 fifo,而且管道的每个阶段都使用相同的 fifo?

在我看来,每个阶段之间都需要一个管道。所以流程会是这样的:

Shell             ls               tr                tr
-----             ----             ----              ----
pipe(fds);
fork();  
close(fds[0]);    close(fds[1]);
                  dup2(fds[0],0); 
                  pipe(fds);
                  fork();         
                  close(fds[0]);   close(fds[1]);  
                  dup2(fds[1],1);  dup2(fds[0],0);
                  exex(...);       pipe(fds);
                                   fork();     
                                   close(fds[0]);     etc
                                   dup2(fds[1],1);
                                   exex(...);  

在每个分叉 shell(close、dup2、pipe 等)中运行的序列看起来像一个函数(获取所需进程的名称和参数)。请注意,exec在每个调用之前,shell 的一个分叉副本都在运行。

编辑:

帕特里克:

Also, is my thinking correct? Shall it work like that? (pseudocode): 
start_fork(ls) -> end_fork(ls) -> start_fork(tr) -> end_fork(tr) -> 
start_fork(tr) -> end_fork(tr) 

我不确定你所说的 start_fork 和 end_fork 是什么意思。您是否暗示在开始ls之前运行完成tr?这并不是上图的真正含义。您的 shell 在开始之前不会等待ls完成tr。它按顺序启动管道中的所有进程,为每个进程设置stdinstdout,以便将进程链接在一起,stdoutof lsto stdinof tr; stdouttrstdin一个tr。这就是 dup2 调用正在做的事情。

进程运行的顺序由操作系统(调度程序)决定,但很明显,如果tr运行并从空读取,stdin它必须等待(阻塞)直到前一个进程向管道写入一些内容。很有可能在从它的读取ls之前运行到完成,但同样有可能它不会。例如,如果链中的第一个命令是连续运行并在此过程中产生输出的东西,那么管道中的第二个命令将不时被安排来处理第一个命令沿管道发送的任何内容。trstdin

希望能澄清一点:-)

于 2012-11-11T14:29:42.310 回答
2

可能值得使用libpipeline。它负责您的所有工作,您甚至可以在管道中包含功能。

于 2012-11-11T19:29:55.437 回答
1

问题是你试图一次做所有事情。而是把它分成更小的步骤。

1)解析您的输入以ls -al |摆脱它。1a)从这里你知道你需要创建一个管道,将它移动到标准输出,然后开始ls -al。然后将管道移动到标准输入。当然还有更多,但你不必担心代码中的它。

2) 解析下一段得到tr a-z A-Z |. 只要您的 next-to-spawn 命令的输出通过管道传送到某处,就返回步骤 1a。

于 2012-11-11T12:50:07.047 回答
0

在 C 中实现流水线。最好的方法是什么?

这个问题有点老了,但这是一个从未提供过的答案。使用libpipeline。libpipeline 是一个管道操作库。用例是man页面维护人员之一,他们必须经常使用如下命令(并解决相关的操作系统错误):

zsoelim < input-file | tbl | nroff -mandoc -Tutf8

这是 libpipeline 的方式:

pipeline *p;
int status;

p = pipeline_new ();
pipeline_want_infile (p, "input-file");
pipeline_command_args (p, "zsoelim", NULL);
pipeline_command_args (p, "tbl", NULL);
pipeline_command_args (p, "nroff", "-mandoc", "-Tutf8", NULL);
status = pipeline_run (p);

libpipeline主页有更多示例。该库还包含在许多发行版中,包括 Arch、Debian、Fedora、Scratch 的 Linux和 Ubuntu。

于 2019-04-07T00:17:03.007 回答