我正在使用 python 管道框架 luigi 和 scikit-learn 进行机器学习批处理作业,尤其是在 MiniBatchDictionaryLearning 模块中。但是当我使用多个进程执行时,它并没有像我预期的那样工作。我的代码是这样的。(这只是一个例子。)
import luigi
import numpy as np
class First(luigi.Task):
job_nums = 2
def requires(self):
return [Second(job_nums=n) for n in range(self.job_nums)]
def output(self):
return luigi.LocalTarget("end.txt")
def run(self):
with self.output().open("w") as out_:
pass
class Second(luigi.Task):
data = np.arange(288000)
job_nums = luigi.IntParameter()
def output(self):
return luigi.LocalTarget("./dict{0}.npy".format(self.job_nums))
def run(self):
from sklearn.decomposition import MiniBatchDictionaryLearning
dico = MiniBatchDictionaryLearning(n_components=144, n_jobs=1)
D = dico.fit(np.reshape(self.data, (1000, 288))).components_
print D
with self.output().open("w") as out_:
D.dump(out_)
当我使用单个进程执行此代码时,它可以工作。
$ PYTHONPATH="" luigi --module this_is_a_test First
DEBUG: Checking if First() is complete
DEBUG: Checking if Secound(job_nums=0) is complete
DEBUG: Checking if Secound(job_nums=1) is complete
~~ snip ~~
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 3 ran successfully:
- 1 First()
- 2 Secound(job_nums=0,1)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
但是当我使用多个进程执行时,它不起作用,我得到了这个错误。
$ PYTHONPATH="" luigi --module this_is_a_test First --workers 2
DEBUG: Checking if First() is complete
DEBUG: Checking if Secound(job_nums=0) is complete
DEBUG: Checking if Secound(job_nums=1) is complete
~~ snip ~~
DEBUG: 2 running tasks, waiting for next task to finish
INFO: [pid 18109] Worker Worker(salt=166214281, workers=2, host=mhigu.local, username=mhigu, pid=18072) running Secound(job_nums=0)
DEBUG: 2 running tasks, waiting for next task to finish
DEBUG: 2 running tasks, waiting for next task to finish
INFO: Worker task Secound(job_nums=1) died unexpectedly with exit code -11
INFO: Worker task Secound(job_nums=0) died unexpectedly with exit code -11
INFO: Informed scheduler that task Secound(job_nums=1) has status FAILED
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Secound(job_nums=0) is currently run by worker Worker(salt=166214281, workers=2, host=mhigu.local, username=mhigu, pid=18072)
INFO: Worker task Secound(job_nums=0) died unexpectedly with exit code -11
INFO: Informed scheduler that task Secound(job_nums=0) has status FAILED
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 1 pending tasks possibly being run by other workers
INFO: There are 1 pending tasks unique to this worker
INFO: Worker Worker(salt=166214281, workers=2, host=mhigu.local, username=mhigu, pid=18072) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 2 failed:
- 2 Secound(job_nums=0,1)
* 1 were left pending, among these:
* 1 had failed dependencies:
- 1 First()
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
我检查了堆栈跟踪,发现 scikit-learn fit 方法出了点问题,但我无法确切找出原因。
你能告诉我如何解决这个问题吗?