1

以下程序不输出任何内容,也不抛出任何错误。我是否遗漏了课堂上的run()方法形式的东西to_S3()

class to_S3(luigi.Task):

    #The class Mysql_to_tsv converts the data returned by a query on a Mysqldb and stores the data in a tsv in a local file.

    def requires(self):
        return [Mysql_to_tsv]

    def output(self):
        return luigi.S3Target("https://s3.amazonaws.com/bucket-name/luigi_attempt.tsv")

该类的output()方法Mysql_to_tsv()是:

def output(self):
        return luigi.LocalTarget('/Users/user/Desktop/Work/Luigi/test_data.tsv')

请帮助该任务的正确类实现。

4

2 回答 2

1

我最初想要将一些数据放入 S3 存储桶中。

因此,不需要一种output()方法来运行特定任务(例如:将数据转储到 S3 存储桶。)

它可以直接在run()方法中完成,并且output()可以用于检查标志或存在。

因此,正确的实现是:

class to_S3(luigi.Task):

    def requires(self):
        return [Mysql_to_csv()]


    def run(self):

        #Creating a connection
        access_key = ""
        access_secret = ""
        conn = S3Connection(access_key, access_secret)

        #Connecting to the bucket
        bucket_name = ""
        bucket = conn.get_bucket(bucket_name)

        #Setting up the keys
        k = Key(bucket)
        k.key = "sample1"
        k.set_contents_from_filename("../test_data.tsv")
于 2015-09-01T09:13:36.083 回答
0

是的,所有非外部的 luigi 任务都需要一个run()方法。

于 2015-08-26T07:41:52.170 回答