2

第一次进入 Luigi(和 Python!)领域并有一些问题。相关代码为:

from Database import Database
import luigi

class bbSanityCheck(luigi.Task):

  conn = luigi.Parameter()
  date = luigi.Parameter()
  def __init__(self, *args, **kwargs):
    super(bbSanityCheck, self).__init__(*args, **kwargs)
    self.has_run = False

  def run(self):
    print "Entering run of bb sanity check"
    # DB STUFF HERE THAT DOESN"T MATTER
   print "Are we in la-la land?"

  def complete(self):
    print "BB Sanity check being asked for completeness: " , self.has_run
    return self.has_run

class Pipeline(luigi.Task):
  date = luigi.DateParameter()

  def requires(self):
    db = Database('cbs')
    self.conn = db.connect()
    print "I'm about to yield!"
    return bbSanityCheck(conn = self.conn, date = self.date)


  def run(self):
    print "Hello World"
    self.conn.query("""SELECT * 
              FROM log_blackbook""")
    result = conn.store_result()

    print result.fetch_row()

  def complete(self):
    return False

if __name__=='__main__':
  luigi.run()

输出在这里(删除了相关数据库返回的原因):

DEBUG: Checking if Pipeline(date=2013-03-03) is complete
I'm about to yield!
INFO: Scheduled Pipeline(date=2013-03-03)
I'm about to yield!
DEBUG: Checking if bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03) is complete
BB Sanity check being asked for completeness:  False
INFO: Scheduled bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 5150] Running   bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
Entering run of bb sanity check
Are we in la-la land?
INFO: [pid 5150] Done      bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 1 pending tasks possibly being run by other workers
INFO: Worker was stopped. Shutting down Keep-Alive thread

所以问题:

1.) 为什么“我即将屈服”会打印两次?

2.) 为什么从来没有打印出“hello world”?

3.) 什么是“1 个可能由其他工作人员运行的待处理任务”?

我更喜欢超级干净的输出,因为它更容易维护。我希望我能消除这些警告等价物。

我还注意到需要“产量”或“返回项目、项目 2、项目 3”。我读过关于产量并理解它。我没有得到的是哪种约定在这里被认为是优越的,或者它们是否是我对语言的新手没有得到的细微差异。

4

1 回答 1

5

我认为您误解了 luigi 的一般工作方式。

(1) 嗯.. 不确定。对我来说,在 INFO 和 DEBUG 中打印相同的东西看起来更像是一个问题

(2) 因此,您正在尝试运行依赖 bbSanityCheck 运行的 Pipeline。bbSanityCheck.complete() 永远不会返回 True,因为您从未在 bbSanityCheck 中将 has_run 设置为 True。所以 Pipeline 任务永远无法运行并输出 hello world,因为它的依赖永远不会完成。

(3) 那可能是因为你有这个待处理的任务(它实际上是管道)。但 Luigi 明白它不可能运行并关闭。

我个人不会使用 has_run 作为检查任务是否已运行的方法,而是检查该作业的结果是否存在。即,如果此作业对数据库执行某项操作,那么 complete() 应该检查预期的内容是否存在。

于 2014-11-27T12:46:20.787 回答