43

我有10多个任务要执行,系统限制最多可以同时运行4个任务。

我的任务可以像这样开始:myprog taskname

如何编写 bash shell 脚本来运行这些任务。最重要的是,当一个任务完成时,脚本可以立即启动另一个任务,使正在运行的任务计数始终保持 4。

4

12 回答 12

62

使用xargs

xargs -P <maximum-number-of-process-at-a-time> -n <arguments-per-process> <command>

详情在这里

于 2013-10-27T12:44:06.740 回答
32

我在考虑编写自己的进程池时偶然发现了这个线程,并且特别喜欢 Brandon Horsley 的解决方案,尽管我无法让信号正常工作,所以我从 Apache 中获得灵感并决定尝试一个带有 fifo 的 pre-fork 模型我的工作队列。

以下函数是工作进程在 fork 时运行的函数。

# \brief the worker function that is called when we fork off worker processes
# \param[in] id  the worker ID
# \param[in] job_queue  the fifo to read jobs from
# \param[in] result_log  the temporary log file to write exit codes to
function _job_pool_worker()
{
    local id=$1
    local job_queue=$2
    local result_log=$3
    local line=

    exec 7<> ${job_queue}
    while [[ "${line}" != "${job_pool_end_of_jobs}" && -e "${job_queue}" ]]; do
        # workers block on the exclusive lock to read the job queue
        flock --exclusive 7
        read line <${job_queue}
        flock --unlock 7
        # the worker should exit if it sees the end-of-job marker or run the
        # job otherwise and save its exit code to the result log.
        if [[ "${line}" == "${job_pool_end_of_jobs}" ]]; then
            # write it one more time for the next sibling so that everyone
            # will know we are exiting.
            echo "${line}" >&7
        else
            _job_pool_echo "### _job_pool_worker-${id}: ${line}"
            # run the job
            { ${line} ; } 
            # now check the exit code and prepend "ERROR" to the result log entry
            # which we will use to count errors and then strip out later.
            local result=$?
            local status=
            if [[ "${result}" != "0" ]]; then
                status=ERROR
            fi  
            # now write the error to the log, making sure multiple processes
            # don't trample over each other.
            exec 8<> ${result_log}
            flock --exclusive 8
            echo "${status}job_pool: exited ${result}: ${line}" >> ${result_log}
            flock --unlock 8
            exec 8>&-
            _job_pool_echo "### _job_pool_worker-${id}: exited ${result}: ${line}"
        fi  
    done
    exec 7>&-
}

你可以在 Github上获得我的解决方案的副本。这是使用我的实现的示例程序。

#!/bin/bash

. job_pool.sh

function foobar()
{
    # do something
    true
}   

# initialize the job pool to allow 3 parallel jobs and echo commands
job_pool_init 3 0

# run jobs
job_pool_run sleep 1
job_pool_run sleep 2
job_pool_run sleep 3
job_pool_run foobar
job_pool_run foobar
job_pool_run /bin/false

# wait until all jobs complete before continuing
job_pool_wait

# more jobs
job_pool_run /bin/false
job_pool_run sleep 1
job_pool_run sleep 2
job_pool_run foobar

# don't forget to shut down the job pool
job_pool_shutdown

# check the $job_pool_nerrors for the number of jobs that exited non-zero
echo "job_pool_nerrors: ${job_pool_nerrors}"

希望这可以帮助!

于 2012-08-09T02:50:42.507 回答
18

使用 GNU Parallel 你可以:

cat tasks | parallel -j4 myprog

如果你有 4 个核心,你甚至可以这样做:

cat tasks | parallel myprog

来自http://git.savannah.gnu.org/cgit/parallel.git/tree/README

完全安装

GNU Parallel 的完整安装非常简单:

./configure && make && make install

个人安装

如果您不是 root,您可以将 ~/bin 添加到您的路径并安装在 ~/bin 和 ~/share 中:

./configure --prefix=$HOME && make && make install

或者,如果您的系统缺少“make”,您可以简单地将 src/parallel src/sem src/niceload src/sql 复制到路径中的目录。

最少的安装

如果您只需要并行并且没有安装“make”(可能系统是旧的或 Microsoft Windows):

wget http://git.savannah.gnu.org/cgit/parallel.git/plain/src/parallel
chmod 755 parallel
cp parallel sem
mv parallel sem dir-in-your-$PATH/bin/

测试安装

在此之后,您应该能够:

parallel -j0 ping -nc 3 ::: foss.org.my gnu.org freenetproject.org

这将并行发送 3 个 ping 数据包到 3 个不同的主机,并在它们完成时打印输出。

观看介绍视频以进行快速介绍: https ://www.youtube.com/playlist?list=PL284C9FF2488BC6D1

于 2013-03-15T16:42:50.310 回答
5

我建议编写四个脚本,每个脚本依次执行一定数量的任务。然后编写另一个脚本,并行启动四个脚本。例如,如果您有脚本 script1.sh、script2.sh、script3.sh 和 script4.sh,您可以有一个名为 headscript.sh 的脚本,如下所示。

#!/bin/sh
./script1.sh & 
./script2.sh & 
./script3.sh & 
./script4.sh &
于 2011-06-22T14:32:17.170 回答
3

我找到了 A Foo Walks into a Bar 中提出的最佳解决方案...博客使用众所周知的xargs工具的内置功能首先创建一个文件commands.txt,其中包含您要执行的命令列表

myprog taskname1
myprog taskname2
myprog taskname3
myprog taskname4
...
myprog taskname123

然后像这样将其通过管道传输到 xargs 以在 4 个进程池中执行:

cat commands.txt | xargs -I CMD --max-procs=4 bash -c CMD

你可以修改进程号

于 2020-01-08T19:42:22.080 回答
2

遵循@Parag Sardas 的回答和此处链接的文档是您可能想要添加到.bash_aliases.

重新链接文档链接,因为它值得一读

#!/bin/bash
# https://stackoverflow.com/a/19618159
# https://stackoverflow.com/a/51861820
#
# Example file contents:
# touch /tmp/a.txt
# touch /tmp/b.txt

if [ "$#" -eq 0 ];  then
  echo "$0 <file> [max-procs=0]"
  exit 1
fi

FILE=${1}
MAX_PROCS=${2:-0}
cat $FILE | while read line; do printf "%q\n" "$line"; done | xargs --max-procs=$MAX_PROCS -I CMD bash -c CMD

./xargs-parallel.sh jobs.txt 4从jobs.txt读取最多4个进程

于 2018-08-15T15:31:59.060 回答
1

你可能可以用信号做一些聪明的事情。

请注意,这只是为了说明这个概念,因此没有经过彻底的测试。

#!/usr/local/bin/bash

this_pid="$$"
jobs_running=0
sleep_pid=

# Catch alarm signals to adjust the number of running jobs
trap 'decrement_jobs' SIGALRM

# When a job finishes, decrement the total and kill the sleep process
decrement_jobs()
{
  jobs_running=$(($jobs_running - 1))
  if [ -n "${sleep_pid}" ]
  then
    kill -s SIGKILL "${sleep_pid}"
    sleep_pid=
  fi
}

# Check to see if the max jobs are running, if so sleep until woken
launch_task()
{
  if [ ${jobs_running} -gt 3 ]
  then
    (
      while true
      do
        sleep 999
      done
    ) &
    sleep_pid=$!
    wait ${sleep_pid}
  fi

  # Launch the requested task, signalling the parent upon completion
  (
    "$@"
    kill -s SIGALRM "${this_pid}"
  ) &
  jobs_running=$((${jobs_running} + 1))
}

# Launch all of the tasks, this can be in a loop, etc.
launch_task task1
launch_task tast2
...
launch_task task99
于 2011-06-22T19:40:08.243 回答
1

这个经过测试的脚本一次运行 5 个作业,并会在它完成后立即重新启动一个新作业(由于在我们获得 SIGCHLD 时会终止 sleep 10.9。更简单的版本可以使用直接轮询(将 sleep 10.9 更改为睡眠 1 并摆脱陷阱)。

#!/usr/bin/bash

set -o monitor
trap "pkill -P $$ -f 'sleep 10\.9' >&/dev/null" SIGCHLD

totaljobs=15
numjobs=5
worktime=10
curjobs=0
declare -A pidlist

dojob()
{
  slot=$1
  time=$(echo "$RANDOM * 10 / 32768" | bc -l)
  echo Starting job $slot with args $time
  sleep $time &
  pidlist[$slot]=`jobs -p %%`
  curjobs=$(($curjobs + 1))
  totaljobs=$(($totaljobs - 1))
}

# start
while [ $curjobs -lt $numjobs -a $totaljobs -gt 0 ]
 do
  dojob $curjobs
 done

# Poll for jobs to die, restarting while we have them
while [ $totaljobs -gt 0 ]
 do
  for ((i=0;$i < $curjobs;i++))
   do
    if ! kill -0 ${pidlist[$i]} >&/dev/null
     then
      dojob $i
      break
     fi
   done
   sleep 10.9 >&/dev/null
 done
wait
于 2011-06-22T20:09:43.550 回答
0

关于 4 个 shell 脚本的其他答案并不完全让我满意,因为它假设所有任务都需要大约相同的时间,并且因为它需要手动设置。但这是我将如何改进它。

主脚本将按照特定的命名约定创建指向可执行文件的符号链接。例如,

ln -s executable1 ./01-task.01

第一个前缀用于排序,后缀标识批次 (01-04)。现在我们生成 4 个 shell 脚本,它们将批号作为输入并执行类似这样的操作

for t in $(ls ./*-task.$batch | sort ; do
   t
   rm t
done
于 2011-06-22T15:33:07.483 回答
0

看看我在 bash 中的作业池实现:https ://github.com/spektom/shell-utils/blob/master/jp.sh

例如,从大量 URL 下载时,最多运行 3 个 cURL 进程,您可以将 cURL 命令包装如下:

./jp.sh "My Download Pool" 3 curl http://site1/...
./jp.sh "My Download Pool" 3 curl http://site2/...
./jp.sh "My Download Pool" 3 curl http://site3/...
...
于 2013-12-17T15:02:50.987 回答
0

这是我的解决方案。这个想法很简单。我创建一个fifo信号量,其中每一行代表一个可用资源。当reading 队列时,如果没有剩余,主进程将阻塞。并且,我们在任务完成后通过简单地echo将任何东西放入队列来返回资源。

function task() {
    local task_no="$1"
    # doing the actual task...
    echo "Executing Task ${task_no}"
    # which takes a long time
    sleep 1
}

function execute_concurrently() {
    local tasks="$1"
    local ps_pool_size="$2"

    # create an anonymous fifo as a Semaphore
    local sema_fifo
    sema_fifo="$(mktemp -u)"
    mkfifo "${sema_fifo}"
    exec 3<>"${sema_fifo}"
    rm -f "${sema_fifo}"

    # every 'x' stands for an available resource
    for i in $(seq 1 "${ps_pool_size}"); do
        echo 'x' >&3
    done

    for task_no in $(seq 1 "${tasks}"); do
        read dummy <&3 # blocks util a resource is available
        (
            trap 'echo x >&3' EXIT # returns the resource on exit
            task "${task_no}"
        )&
    done
    wait # wait util all forked tasks have finished
}

execute_concurrently 10 4

上面的脚本将同时运行 10 个任务和 4 个任务。您可以将$(seq 1 "${tasks}")顺序更改为您要运行的实际任务队列。

于 2019-06-12T14:44:44.507 回答
0

我根据在 Bash 中编写进程池中介绍的方法进行了修改。

#!/bin/bash

#set -e   # this doesn't work here for some reason
POOL_SIZE=4   # number of workers running in parallel

#######################################################################
#                            populate jobs                            #
#######################################################################

declare -a jobs

for (( i = 1988; i < 2019; i++ )); do
    jobs+=($i)
done

echo '################################################'
echo '    Launching jobs'
echo '################################################'

parallel() {
    local proc procs jobs cur
    jobs=("$@")         # input jobs array
    declare -a procs=() # processes array
    cur=0               # current job idx

    morework=true
    while $morework; do
        # if process array size < pool size, try forking a new proc
        if [[ "${#procs[@]}" -lt "$POOL_SIZE" ]]; then
            if [[ $cur -lt "${#jobs[@]}" ]]; then
                proc=${jobs[$cur]}
                echo "JOB ID = $cur; JOB = $proc."

                ###############
                # do job here #
                ###############

                sleep 3 &

                # add to current running processes
                procs+=("$!")
                # move to the next job
                ((cur++))
            else
                morework=false
                continue
            fi
        fi

        for n in "${!procs[@]}"; do
            kill -0 "${procs[n]}" 2>/dev/null && continue
            # if process is not running anymore, remove from array
            unset procs[n]
        done
    done
    wait
}

parallel "${jobs[@]}"
于 2019-07-22T14:02:54.693 回答