0

我正在运行一个 EMR Spark 集群(使用 YARN),并且我正在直接从 EMR 主服务器运行 Luigi 任务。我有一个依赖于 S3 中数据的作业链,经过几次 SparkSubmitTasks 后,最终会出现在 Redshift 中。

import luigi
import luigi.format
from luigi.contrib.spark import SparkSubmitTask
from luigi.contrib.redshift import RedshiftTarget


class SomeSparkTask(SparkSubmitTask):

    # Stored in /etc/luigi/client.cfg
    host = luigi.Parameter(default='host')
    database = luigi.Parameter(default='database')
    user = luigi.Parameter(default='user')
    password = luigi.Parameter(default='password')
    table = luigi.Parameter(default='table')

    <add-more-params-here>

    app = '<app-jar>.jar'
    entry_class = '<path-to-class>'

    def app_options(self):
        return <list-of-options>

    def output(self):
        return RedshiftTarget(host=self.host, database=self.database, user=self.user, password=self.password,
                              table=self.table, update_id=<some-unique-identifier>)

    def requires(self):
        return AnotherSparkSubmitTask(<params>)

我遇到了两个主要问题:

1) 有时 luigi 无法确定 SparkSubmitTask 何时完成 - 例如,我会看到 luigi 提交了一个作业,然后检查 YARN,它会说应用程序正在运行,但一旦完成,luigi 就会挂起并且无法确定工作是否完成。

2) 如果由于某种原因 SparkSubmitTasks 能够运行并且我放置在上面的任务完成了 Spark 作业,则输出任务永远不会运行,并且标记表永远不会创建或填充。但是,实际表是在运行的 Spark 作业中创建的。我是否误解了我应该如何调用 RedshiftTarget?

与此同时,我正在尝试熟悉源代码。

谢谢!

4

1 回答 1

1

放弃在我的 Spark 应用程序中使用 Luigi,因为我的所有数据现在都流式传输到 S3 中,我只需要一个大型单体应用程序来运行我的所有 Spark 聚合,这样我就可以利用中间结果/缓存。

于 2016-02-23T22:36:27.863 回答