4

我需要使用来自 python的Luigi运行 Hadoop jar 作业。我搜索并找到了在 Luigi 中编写 mapper 和 reducer 的示例,但没有直接运行 Hadoop jar。

我需要运行直接编译的 Hadoop jar。我该怎么做?

4

1 回答 1

3

您需要使用luigi.contrib.hadoop_jar包(代码)。

特别是,您需要扩展HadoopJarJobTask. 例如,像这样:

from luigi.contrib.hadoop_jar import HadoopJarJobTask
from luigi.contrib.hdfs.target import HdfsTarget

class TextExtractorTask(HadoopJarJobTask):
    def output(self):
        return HdfsTarget('data/processed/')

    def jar(self):
        return 'jobfile.jar'

    def main(self):
        return 'com.ololo.HadoopJob'

    def args(self):
        return ['--param1', '1', '--param2', '2']

您还可以在工作流中包含使用 maven 构建 jar 文件:

import luigi
from luigi.contrib.hadoop_jar import HadoopJarJobTask
from luigi.contrib.hdfs.target import HdfsTarget
from luigi.file import LocalTarget

import subprocess
import os

class BuildJobTask(luigi.Task):
    def output(self):
        return LocalTarget('target/jobfile.jar')

    def run(self):
        subprocess.call(['mvn', 'clean', 'package', '-DskipTests'])

class YourHadoopTask(HadoopJarJobTask):
    def output(self):
        return HdfsTarget('data/processed/')

    def jar(self):
        return self.input().fn

    def main(self):
        return 'com.ololo.HadoopJob'

    def args(self):
        return ['--param1', '1', '--param2', '2']

    def requires(self):
        return BuildJobTask()
于 2015-11-03T12:03:51.770 回答