用例:
我有一个协调器,它将一个包含多个文件的目录传递给一个工作流。该工作流具有以下节点:
java node 1 : Reads the file, and does some json parsing gets some input values to below nodes. Done using <capture-output>.
pig node 1 : Does some action. Requires above input values from parsed json.
pig node 2 : Same as above
pig node 3 : ................
..................
问题:
协调器将目录名称传递给工作流。我想做以下事情:
for every file in directory {
java node 1 : get config from file X
pig node 1 : ...............
..............
}
请提出一种我可以做到这一点的方法。
下面是协调员:
LAST_ONLY
<datasets>
<dataset name="input" frequency="${datasetFrequency}" initial-instance="${datasetInitialInstance}" timezone="UTC">
<uri-template>${nameNode}/user/${coord:user()}/alertcampaign/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
<done-flag></done-flag>
</dataset>
</datasets>
<input-events>
<data-in name="inputLogs1" dataset="input">
<instance>${coord:current(0)}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${nameNode}/user/${coord:user()}/test.xml</app-path>
<configuration>
<property>
<name>wfInput</name>
<value>${coord:dataIn('inputLogs1')}</value>
</property>
</configuration>
</workflow>