4

我正在尝试创建 Ooize 协调器。问题是我已经有暂存数据等待使用 oozie 进行处理。

想象一下这样的情况。

  1. 当前日期是:01.03.2013(2013年3月1日)

  2. 我确实有这些输入目录:

    /staging/landing/source/xvlr/2013/02/01/00(2013 年 2 月 1 日,一天的第一个小时) /staging/landing/source/xvlr/2013/02/01/01

    /staging/landing/source/xvlr/2013/02/01/02

    /staging/landing/source/xvlr/2013/02/01/03

    /staging/landing/source/xvlr/2013/02/01/04

    ……

    /登台/登陆/来源/xvlr/2013/02/28/00

    ...

    /登台/登陆/来源/xvlr/2013/02/28/23

我希望我的 oozie 协调器使用所有以前创建的着陆数据并产生这样的输出:

/masterdata/source/xvlr/2013/02/01/00 
/masterdata/source/xvlr/2013/02/01/01
/masterdata/source/xvlr/2013/02/01/02
/masterdata/source/xvlr/2013/02/01/03
/masterdata/source/xvlr/2013/02/01/04
....
/masterdata/source/xvlr/2013/02/28/00
...
/masterdata/source/xvlr/2013/02/28/23

然后我希望我的协调员每小时运行一次并为 masterdata 生成新的输出。

我如何使用协调器规范来做到这一点?这是我的协调员。它什么也不做。它确实达到了我需要的时间,然后等待。它不会开始工作。

请指教。

<coordinator-app name="Xvlr-parser-coordinator" frequency="60"
                 start="2013-03-07T05:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow"  xmlns="uri:oozie:coordinator:0.3">
    <controls>
        <timeout>5</timeout>
        <concurrency>4</concurrency>
    </controls>

    <datasets>
        <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>
        <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>

    </datasets>

    <input-events>
        <data-in name="xvlrInputEvent" dataset="xvlrInputDataset">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>

    <output-events>
        <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset">
            <instance>${coord:current(0)}</instance>
        </data-out>
    </output-events>
    <action>
        <workflow>
            <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path>
            <configuration>
                <property>
                    <name>inputDir</name>
                    <value>${coord:dataIn('xvlrInputEvent')}</value>
                </property>
                <property>
                    <name>outputDir</name>
                    <value>${coord:dataOut('xvlrOutputEvent')}</value>
                </property>

            </configuration>

        </workflow>
    </action>
</coordinator-app>
4

1 回答 1

4

这是正确的解决方案(它可以工作几天:)))):

<coordinator-app name="Xvlr-parser-coordinator" frequency="60"
                 start="2013-03-07T16:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow"  xmlns="uri:oozie:coordinator:0.3">
    <controls>
        <timeout>3</timeout>
        <concurrency>1</concurrency>
    </controls>

    <datasets>
        <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>
        <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>

    </datasets>

    <input-events>
        <data-in name="xvlrInputEvent" dataset="xvlrInputDataset">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>

    <output-events>
        <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset">
            <instance>${coord:current(0)}</instance>
        </data-out>
    </output-events>
    <action>
        <workflow>
            <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path>
            <configuration>
                <property>
                    <name>inputDir</name>
                    <value>${coord:dataIn('xvlrInputEvent')}</value>
                </property>
                <property>
                    <name>outputDir</name>
                    <value>${coord:dataOut('xvlrOutputEvent')}</value>
                </property>
            </configuration>

        </workflow>
    </action>
</coordinator-app>

它有什么作用?

  • 起初它确实开始于2013-03-07T16:35Z,因此所有以前
    收集的数据都已通过底层工作流(具有解析功能的 mr-job 调用)
    • 在使用“过去时间数据集”(数据集时间小于当前时间)时,工作流正在逐一运行:它确实消耗了 /pastdate/hour_00,然后它立即开始消耗 /pastdate/hour_01 等
    • 当协调器到达当前时间时,它开始每小时调用工作流(按设计:05:35、06:35... 23:35)。
    • 请参阅超时声明:我确实缺少数据集:例如,3 月 1 日的第 10 个小时没有数据。工作流确实等待了 3 分钟,然后死了。

问题已经解决了。

于 2013-03-13T06:34:01.110 回答