15

在网上做了一些搜索,找到了使用命名管道的简单“教程”。但是,当我对后台作业做任何事情时,我似乎会丢失很多数据。

[[编辑:找到了一个更简单的解决方案,请参阅对帖子的回复。所以我提出的问题现在是学术性的——以防万一有人想要一个工作服务器]]

使用 Ubuntu 10.04 和 Linux 2.6.32-25-generic #45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/Linux

GNU bash,版本 4.1.5(1)-release (x86_64-pc-linux-gnu)。

我的 bash 功能是:

function jqs
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read txt <"$pipe"
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      fi
    fi
  done
}

我在后台运行它:

> jqs&
[1] 5336

现在我喂它:

for i in 1 2 3 4 5 6 7 8
do
  (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done

输出不一致。我经常没有得到所有成功的回声。我得到的新文本回声最多与成功回声一样多,有时更少。

如果我从“提要”中删除“&”,它似乎可以工作,但在读取输出之前我会被阻止。因此我想让子进程被阻塞,而不是主进程。

目的是编写一个简单的作业控制脚本,这样我最多可以并行运行 10 个作业,并将其余作业排队以供以后处理,但可靠地知道它们确实在运行。

完整的工作经理如下:

function jq_manage
{
  export __gn__="$1"

  pipe=/tmp/__job_control_manager_"$__gn__"__
  trap "rm -f $pipe"    EXIT
  trap "break"      SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    date
    jobs
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
    then
      echo "Waiting for new job"
      if read new_job <"$pipe"
      then
    echo "new job is [[$new_job]]"

    if [[ "$new_job" == 'quit' ]]
    then
      break
    fi

    echo "In group $__gn__, starting job $new_job"
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
      fi
    else
      sleep 3
    fi
  done
}

function jq
{
  # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
  # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently

  export __gn__="$1"
  shift
  export __jN__="$1"
  shift

  export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
  if (($__jq__ '<' 1))
  then
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
  fi

  pipe=/tmp/__job_control_manager_"$__gn__"__

  echo $@ >$pipe
}

打电话

jq <name> <max processes> <command>
jq abc 2 sleep 20

将启动一个进程。那部分工作正常。开始第二个,很好。一个接一个似乎工作正常。但是在循环中开始 10 似乎会丢失系统,就像上面更简单的示例一样。

任何关于我可以做些什么来解决这种明显的 IPC 数据丢失的提示将不胜感激。

问候,阿兰。

4

6 回答 6

27

您的问题是if以下陈述:

while true
do
    if read txt <"$pipe"
    ....
done

发生的事情是您的作业队列服务器每次在循环周围打开和关闭管道。这意味着一些客户端在尝试写入管道时遇到“管道损坏”错误 - 也就是说,管道的读取器在写入器打开后消失了。

要解决此问题,请更改服务器中的循环,为整个循环打开管道一次:

while true
do
    if read txt
    ....
done < "$pipe"

这样做,管道打开一次并保持打开状态。

您需要注意循环内运行的内容,因为循环内的所有处理都将标准输入附加到命名管道。您需要确保从其他地方重定向循环内所有进程的标准输入,否则它们可能会消耗管道中的数据。

编辑:现在的问题是当最后一个客户端关闭管道时您在读取时获得 EOF,您可以使用 jilles 方法复制文件描述符,或者您可以确保您也是客户端并保留写入端打开的管道:

while true
do
    if read txt
    ....
done < "$pipe" 3> "$pipe"

这将保持管道的写入端在 fd 3 上打开。同样的警告适用于这个文件描述符和标准输入。您将需要关闭它,以便任何子进程都不会继承它。它可能没有标准输入那么重要,但它会更干净。

于 2010-11-27T12:07:49.973 回答
8

正如在其他答案中所说,您需要始终保持先进先出,以避免丢失数据。

但是,一旦在 fifo 打开后所有写入者都离开了(所以有写入者),立即读取 return (并poll()返回POLLHUP)。清除此状态的唯一方法是重新打开 fifo。

POSIX 没有为此提供解决方案,但至少 Linux 和 FreeBSD 提供了解决方案:如果读取开始失败,请再次打开 fifo,同时保持原始描述符打开。这是因为在 Linux 和 FreeBSD 中,“挂起”状态对于特定的打开文件描述是本地的,而在 POSIX 中,它对于 fifo 是全局的。

这可以在这样的 shell 脚本中完成:

while :; do
    exec 3<tmp/testfifo
    exec 4<&-
    while read x; do
        echo "input: $x"
    done <&3
    exec 4<&3
    exec 3<&-
done
于 2010-11-27T21:17:45.707 回答
2

只是对于那些可能感兴趣的人,[[重新编辑]] 在 camh 和 jilles 的评论之后,这里有两个新版本的测试服务器脚本。

这两个版本现在都可以正常工作。

camh 的管道管理版本:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    fi
  done 3< "$pipe" 4> "$pipe"    # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF
}

jille 的管道管理版本:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  exec 3< "$pipe"
  exec 4<&-

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    else
      # Close the pipe and reconnect it so that the next read does not end up returning EOF
      exec 4<&3
      exec 3<&-
      exec 3< "$pipe"
      exec 4<&-
    fi
  done
}

感谢大家的帮助。

于 2010-11-27T22:39:11.180 回答
1

就像 camh 和丹尼斯威廉姆森说的不要打破管道。

现在我有更小的例子,直接在命令行上:

服务器:

(
  for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9};
  do
    if read s;
      then echo ">>$i--$s//";
    else
      echo "<<$i";
    fi;
  done < tst-fifo
)&

客户:

(
  for i in {%a,#b}{1,2}{0,1};
  do
    echo "Test-$i" > tst-fifo;
  done
)&

可以将关键行替换为:

    (echo "Test-$i" > tst-fifo&);

发送到管道的所有客户端数据都会被读取,尽管客户端的选项二可能需要在读取所有数据之前启动服务器几次。

但是,尽管读取等待管道中的数据开始,但一旦数据被推送,它就会永远读取空字符串。

有什么办法可以阻止这一切?

再次感谢您的任何见解。

于 2010-11-27T17:38:05.833 回答
0

一方面,问题比我想象的要严重:现在,在我的更复杂的示例 (jq_manage) 中似乎存在这样一种情况,即从管道中一遍又一遍地读取相同的数据(即使没有写入新数据)给它)。

另一方面,我找到了一个简单的解决方案(根据丹尼斯的评论编辑):

function jqn    # compute the number of jobs running in that group
{
  __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l)
}

function jq
{
  __groupn__="$1";  shift   # job group name (the pool within which to allocate $__jmax__ jobs)
  __jmax__="$1";    shift   # maximum of job numbers to run concurrently

  jqn
  while (($__jqty__ '>=' $__jmax__))
  do
    sleep 1
    jqn
  done

  eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; $@) &"
}

奇迹般有效。不涉及套接字或管道。简单的。

于 2010-11-27T08:37:31.493 回答
0

最多并行运行 10 个作业并将其余作业排队以供以后处理,但可靠地知道它们确实在运行

您可以使用 GNU Parallel 来做到这一点。您将不需要此脚本。

http://www.gnu.org/software/parallel/man.html#options

您可以设置 max-procs “作业槽数。最多并行运行 N 个作业。” 有一个选项可以设置您要使用的 CPU 内核数。您可以将已执行作业的列表保存到日志文件中,但这是一个测试版功能。

于 2013-05-02T16:38:59.833 回答