1

社区,我想HdfsTarget在运行SparkSubmitTask. 最佳做法是什么?到目前为止,我尝试了附加代码中提到的两个选项,但均未成功:

  1. HdfsTarget如果已经存在,则相关/必需的作业不会被执行
  2. 如果调用,任务将并行执行yield
import luigi
import luigi.format
import luigi.contrib.hdfs
from luigi.contrib.spark import SparkSubmitTask


class CleanUp(luigi.Task):
    path = luigi.Parameter()

    def run(self):
        self.target = luigi.contrib.hdfs.HdfsTarget(self.path, format=luigi.format.Gzip)
        if self.target.exists():
            self.target.remove(skip_trash=True)


class MySparkTask(SparkSubmitTask):
    output = luigi.Parameter()

    driver_memory = '8g'
    executor_memory = '3g'
    num_executors = 5

    app = 'my-app.jar'
    entry_class = 'com.company.MyJob'

    def app_options(self):
        return ['/input', self.output]

    def requires(self):
        (1)

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(self.output, format=luigi.format.Gzip)


class RunAll(luigi.Task):
    result_dir = '/output'

    ''' Dummy task that triggers execution of a other tasks'''
    def requires(self):
        (2)
        return MySparkTask(self.result_dir)
4

0 回答 0