0

我设置了一个流程来识别文件何时被放入目录。接下来我需要运行一个处理文件的 Bash 脚本(相当密集的处理)。该脚本抓取一个 PDF,创建一个临时目录,将 PDF 分解为单独的 PNG 文件,对每个图像运行 OCR 处理器,将结果转换为单页 PDF,然后将所有 PDF 合并为一个多页 PDF来自 OCR 的文本层。

问题是,在触发 10 个并发转换后,Bash 脚本会阻塞。现在我让 Mule ESB 监听新文件,然后触发每个文件的脚本,传递适当的参数。不幸的是,Mule 有两个任务,监听 -> 触发。我们将在该目录中有超过 200 个文件需要排队等待处理,最好一次处理 5 个。如何让 Mule 限制触发的并发进程数?

以下是我最初的流程草案:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:cxf="http://www.mulesoft.org/schema/mule/cxf" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" version="CE-3.3.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd 
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd 
http://www.mulesoft.org/schema/mule/cxf http://www.mulesoft.org/schema/mule/cxf/current/mule-cxf.xsd 
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd ">
    <configuration>
    <default-threading-profile doThreading="false"/>
  </configuration>

    <queued-asynchronous-processing-strategy name="limitThreads" maxThreads="2"/>

    <flow name="Poll_DirectoryFlow1" doc:name="Poll_DirectoryFlow1" processingStrategy="limitThreads">
        <file:inbound-endpoint path="/home/administrator/Downloads/Input" responseTimeout="10000" doc:name="File" pollingFrequency="5000" fileAge="5000">

        </file:inbound-endpoint>
        <scripting:component doc:name="Script">
            <scripting:script engine="Groovy">
                <property key="originalFilename" value="#[header:originalFilename]"/>
                <scripting:text><![CDATA[def filename = message.getInboundProperty('originalFilename')
                                                        println "$filename"
                                                        def directory = message.getInboundProperty('directory')
                                                        println "$directory"
                                                        "mkdir processed".execute()
                                                        def command = ["/home/administrator/ocr.sh", "$directory/$filename", "/home/administrator/Downloads/Output/$filename"]
                                                        println "$command"
                                                        def proc = "pwd".execute()
                                                        command.execute()
                                                        proc.waitFor()
                                                        println "${proc.in.text}"]]></scripting:text>
            </scripting:script>
        </scripting:component>
        <echo-component doc:name="Echo"/>        
    </flow>
</mule>

这是实际的 Bash 脚本(提供了一些关于我们正在使用的工具的提示):

#!/bin/bash

#Setting variables
PARAM=$#
TMPDIR=./split
INFILENAME=${1##*/}
OUTFILENAME=${2##*/}
echo "1 is $1"
echo "2 is $2"
echo "infilename is $INFILENAME"
echo "outfilename is $OUTFILENAME"

#Logging I/O filenames
echo "infile: $1" >> error.log
echo "outfile: $2" >> error.log

#If the temporary directory doesn't exist, make it
if [ ! -d "$TMPDIR" ];
then
    mkdir $TMPDIR
fi

#Check to see that the correct number of params have been passed.
if [[ $PARAM -lt 2 ]];
then
    echo "Usage: $0 source.pdf output.pdf"
    echo "output.pdf is the desired output file"
    echo "source.pdf is a file to be OCR'd"
    exit 1
fi

#Make sure the input file is a PDF
if [ "${1##*.}" == "pdf" ];
then
    multilayer=false

    #Check to see if the input file is a multi-layered pdf with searchable text
        if grep -Fl "Font" "$1"; then multilayer=true; fi

    #If it's not multi-layered, then perform the OCR
    if [ "$multilayer" == "false" ];
    then
        mkdir $TMPDIR/"$INFILENAME/"
        echo "making temporary directory $TMPDIR/$INFILENAME"
        #Split the PDF into pdf's of one page per df in a temporary directory
        pdftk "$1" burst output "$TMPDIR/$INFILENAME/pg_%04d.pdf"
        echo "burse output to $TMPDIR/$INFILENAME/pg_%04d.pdf"
        mv "$1" processed/
        for files in "$TMPDIR/$INFILENAME/"*
            do
            echo "$files"
                    filename=$(basename "$files")
                    filename="${filename%.*}"

            #Convert the pdf page into an image
                    gs -r300 -o "$TMPDIR/$INFILENAME/$filename.jpeg" -sDEVICE=jpeg "$TMPDIR/$INFILENAME/$filename.pdf"

            #Perform the OCR against the image
                    tesseract "$TMPDIR/$INFILENAME/$filename.jpeg" "$TMPDIR/$INFILENAME/$filename" hocr

            #Combine the OCR'd image and OCR'd text into a multi-layer PDF file of that page
                    hocr2pdf -i "$TMPDIR/$INFILENAME/$filename.jpeg" -o "$TMPDIR/$INFILENAME/$filename.pdf" < "$TMPDIR/$INFILENAME/$filename.html"
                    compressed="$filename-compressed.pdf"

            #Compress the multi-layered PDF of the page
                    gs -sDEVICE=pdfwrite -dCompatibilityLevel=1.4 -dNOPAUSE -dQUIET -dBATCH -sOutputFile="$TMPDIR/$INFILENAME/$compressed $TMPDIR/$INFILENAME/$filename.pdf"
                    mv "$TMPDIR/$INFILENAME/$compressed" "$TMPDIR/$INFILENAME/$filename"
            done

        #Concatenate all of the multiline PDF pages into a single PDF file
        pdftk "$TMPDIR/$INFILENAME/"*.pdf cat output "$OUTFILENAME"
        compressed="$OUTFILENAME-compressed.pdf"

        #Compress the multi-layered PDF
        gs -sDEVICE=pdfwrite -dCompatibilityLevel=1.4 -dNOPAUSE -dQUIET -dBATCH -sOutputFile="$compressed" "$OUTFILENAME"
        mv "$compressed" "$2"
        rm -rf "$TMPDIR/$INFILENAME"
    else
        echo "The input file is multi-layered"
        mv "$1" "$2"
    fi
else
    echo "Please enter a valid input pdf file"
    exit 2
fi
4

2 回答 2

1

解决您的问题的一个简单方法是不使用您正在设置的基于线程配置文件的策略,并将脚本组件替换为配置如下的池化 Java 组件:

<pooled-component class="org.mule.PooledComponent">
        <pooling-profile exhaustedAction="WHEN_EXHAUSTED_WAIT" maxActive="0" maxWait="-1"  initialisationPolicy="INITIALISE_NONE"/>
</pooled-component>

您应该将 bash 脚本的调用放在该组件中。你可以在这里找到关于它的文档

于 2012-10-24T01:23:59.110 回答
0

@genjosanzo ...您让我在考虑处理策略方面走上了正轨。这是最终工作的解决方案:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:cxf="http://www.mulesoft.org/schema/mule/cxf"
    xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
    xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:file="http://www.mulesoft.org/schema/mule/file"
    xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:spring="http://www.springframework.org/schema/beans" version="CE-3.3.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd 
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd 
http://www.mulesoft.org/schema/mule/cxf http://www.mulesoft.org/schema/mule/cxf/current/mule-cxf.xsd 
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd ">

    <queued-asynchronous-processing-strategy
        name="limitThreads" maxThreads="7"
        doc:name="Queued Asynchronous Processing Strategy" />
    <flow name="Poll_DirectoryFlow1" doc:name="Poll_DirectoryFlow1"
        processingStrategy="limitThreads">
        <file:inbound-endpoint path="/home/administrator/Downloads/Input"
            responseTimeout="10000" doc:name="File" pollingFrequency="60000"
            fileAge="5000">
            <file:filename-regex-filter pattern="^.*\.(pdf)$"
                caseSensitive="false" />
        </file:inbound-endpoint>
        <scripting:component doc:name="Script">
            <scripting:script engine="Groovy">
                <scripting:text><![CDATA[def filename = message.getInboundProperty('originalFilename')
                println "$filename"
                def directory = message.getInboundProperty('directory')
                println "$directory"
                "mkdir processed".execute()
                def command = ["/home/administrator/ocr.sh", "$directory/$filename", "/home/administrator/Downloads/Output/$filename"]
                println "$command"
                def cmd = command.execute()
                cmd.waitFor()
                println "$filename has completed processing"]]></scripting:text>
            </scripting:script>
        </scripting:component>
        <echo-component doc:name="Echo"/>
    </flow>
</mule>
于 2012-10-24T01:43:37.223 回答