0

用例:

我有一个协调器,它将一个包含多个文件的目录传递给一个工作流。该工作流具有以下节点:

    java node 1 :   Reads the file, and does some json parsing gets some input values to below nodes. Done using <capture-output>.
    pig node 1  :   Does some action. Requires above input values from parsed json.
    pig node 2  :   Same as above
    pig node 3  : ................
    ..................

问题:

协调器将目录名称传递给工作流。我想做以下事情:

for every file in directory {
    java node 1 : get config from file X
    pig node 1  : ...............
    ..............
}

请提出一种我可以做到这一点的方法。

下面是协调员:

LAST_ONLY

    <datasets>
            <dataset name="input" frequency="${datasetFrequency}" initial-instance="${datasetInitialInstance}" timezone="UTC">
                    <uri-template>${nameNode}/user/${coord:user()}/alertcampaign/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
                    <done-flag></done-flag>
            </dataset>
    </datasets>

    <input-events>
            <data-in name="inputLogs1" dataset="input">
                    <instance>${coord:current(0)}</instance>
            </data-in>

    </input-events>

    <action>
       <workflow>
             <app-path>${nameNode}/user/${coord:user()}/test.xml</app-path>
                   <configuration>
                            <property>
                                    <name>wfInput</name>
                                    <value>${coord:dataIn('inputLogs1')}</value>
                            </property>
                    </configuration>
      </workflow>

4

1 回答 1