问题标签 [luigi]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2224 浏览

python-3.x - Luigi LocalTarget 二进制文件

LocalTarget在项目的 Luigi 管道中编写二进制文件时遇到了麻烦。我在这里隔离了问题:

我尝试打开'w''wb'但我不断收到以下错误:

我正在使用 python 3.5.1,我的 luigi 版本是 2.1.1

0 投票
1 回答
960 浏览

python - 如何动态创建 Luigi 任务

我正在为 Luigi Tasks 构建一个包装器,但我遇到了一个障碍,Register该类实际上是一个 ABC 元类,并且在创建动态type.

以下代码或多或少是我用来开发动态类的代码。

但是,当我运行调用函数时,我得到PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed.

调用函数:

0 投票
1 回答
5114 浏览

python - 如何使用 spark-submit 和 pyspark 运行 luigi 任务

我有一个luigi python 任务,其中包括一些 pyspark 库。现在我想用 spark-submit 在 mesos 上提交这个任务。我应该怎么做才能运行它?下面是我的代码骨架:

如果没有 luigi,我将提交此任务作为以下命令行:

现在的问题是我如何才能提交包含 luigi 命令行的 luigi 任务,例如:

还有一个问题是,如果 my_module.py 有一个需要先完成的任务,我是否需要为它做更多的事情,或者只是设置为与当前命令行相同?

我非常感谢您对此的任何提示或建议。非常感谢。

0 投票
1 回答
4320 浏览

python - 如何使用 Luigi 处理输出

我试图掌握 luigi 的工作原理,我明白了,但实际实现有点困难;)这就是我所拥有的:

这失败了RuntimeError: Unfulfilled dependency at run time: OtherTask_3_5862334ee2。我认为我需要生成输出def output(self):来解决此问题\功能。而且我无法理解如何在不写入文件的情况下产生合理的输出,例如:

我已经尝试阅读文档,但我根本无法理解输出背后的概念。如果我只需要输出到屏幕怎么办。如果我需要将一个对象输出到另一个任务怎么办?谢谢!

0 投票
1 回答
1299 浏览

python - luigi 每任务重试策略

我在配置 luigi per-task retry-policy 时遇到问题。我已将全局 luigi.cfg 文件配置如下:

此外,它在 luigi 配置手册中指出编写任务如下:

足以覆盖 luigi.cfg 中指定的 luigi retry_count。但是,此设置根本不会影响运行。我设法创建了一个每次都失败的任务,只是为了测试,并且记录返回这个任务失败了 5 次(而不是 3 次)。

我认为我缺少一些基本的东西。

0 投票
1 回答
654 浏览

mongodb - 路易吉 Python 中的 MongoDB

我想知道是否有办法在 Luigi 中输出到 MongoDB。我在文档中看到它们支持文件(本地 FS、HDFS)、S3、PostgreSQL 但不支持 MongoDB。如果没有,有人能解释一下为什么不吗?也许拥有它是个坏主意?我想将数据存储在数据库中,因为这样我就可以通过查询来探索它。但是我使用的是 mongodb,我不想安装另一个数据库。我不需要关系数据库,因为我只使用数据库来存储和查询( NoSql )而没有关系,所以最好的选择是 mongodb。

基本上我需要一个任务来读取数据并将其保存在数据库中。然后下一个任务获取这个输出并处理数据。

任何建议、建议或澄清都非常受欢迎。谢谢!

0 投票
1 回答
968 浏览

ubuntu - 脚本在启动时未运行,systemd ubuntu

我目前正在尝试在我的 Ubuntu 服务器打开时启动 Luigid,我尝试了几种技术,包括 rc.local、cronjob(@reboot)、upstart、systemd,但它们似乎都没有工作。

我应该指出,如果我手动执行该命令,该命令运行良好,我只需要它在启动时运行。在这一点上,我真的不担心我用哪种方式让它工作,所以这里有一些我尝试过的东西 -

克朗:

用过的

并输入

系统化:

我在 /usr/bin 中有一个名为 luigid 的脚本,其中包含以下内容,它被标记为可执行文件,我已经尝试过使用和不使用“exit 0”的情况,担心可能需要正确的退出代码 -

和 /etc/systemd/system/ 中名为 luigid.service 的服务文件 -

我尝试过分叉(并在服务和 lugid 命令中指定 PIDfile,oneshot 和 simple 的类型都没有运气。

我已使用 -

它似乎尝试启动服务,因为使用检查状态

systemctl status luigid.service

节目

一定有一些明显的我遗漏的东西,让命令在启动时运行真的不是那么难!

0 投票
1 回答
1143 浏览

python - 在 luigi 中处理很多参数

在我的很多项目中,我使用luigi作为流水线工具。这让我想到用它来实现参数搜索。该标准luigi.file.LocalTarget有一种非常幼稚的方法来处理参数,这也显示在文档的示例中:

即,参数保存在文件名中。这使得检查某个参数组合是否已经计算变得容易。一旦任务的参数更复杂,这就会变得混乱。

这是参数搜索的一个非常简单的想法:

当然,在这个例子中,所有四个参数都可以编码在文件名中,但是不需要很多幻想,这种方法可以达到边界。例如,考虑类似数组的参数。

我的后续想法是将参数和结果存储在某种信封对象中,然后可以将其保存为目标。然后,文件名可以是第一次模糊搜索的参数的某种散列。

有信封班

然后是新的 Target,它增强了 LocalTarget 并能够检查信封内的所有参数是否匹配:

这里的问题是,使用这个目标会增加一些原本 luigi 旨在最小化的样板。我设置了一个新的基础任务

生成的EnvelopedSum任务将非常小:

此任务可以以与开始的任务相同的方式运行Sum

注意:这个如何封装 luigi-task-results 的示例实现远非稳定,而是更多地说明了我所说的封装结果和参数的含义。

我的问题是:难道没有更简单的方法来处理 luigi 中的许多复杂参数吗?

后续问题:有没有人考虑过保存已执行参数搜索的代码版本(和/或 subtaks 的包版本)的记录?

任何关于在哪里阅读有关此主题的评论也表示赞赏。

笔记:

您可能需要一些导入才能使其运行:

0 投票
2 回答
1247 浏览

postgresql - 近实时 ETL 架构的正确工具

我们有一个系统,其中我们的主要数据存储(和“通用数据源”)是 Postgres,但我们实时以及每晚汇总复制该数据。我们目前复制到 Elasticsearch、Redis、Redshift(仅限每晚),并且还在添加 Neo4j。

我们的 ETL 管道已经变得足够广泛,以至于我们开始研究AirflowLuigi等工具,但从我最初的研究中可以看出,这些工具几乎完全用于批量加载。

是否有任何工具可以处理既可以处理大批量 ETL 过程也可以处理动态、大容量、单个记录复制的 ETL 过程?Airflow 或 Luigi 会处理这个问题,而我只是错过了它吗?

谢谢!

0 投票
0 回答
381 浏览

python - Luigi 任务在主“ValueError:无法解码 JSON 对象”上失败

当我在 OS X El Cap 上运行没有 --local-scheduler 的任务时,我只会收到此错误。使用命令创建的守护进程luigid当前也在运行。

当我使用 --local-scheduler 运行任务时,它按预期运行。在我的 Windows 7 虚拟机上,无论有无 --local-scheduler,它都运行良好。好奇为什么它只在 OS X 上的那一种情况下失败。

代码片段来自我参考的这篇文章开始学习 Luigi:https ://marcobonzanini.com/2015/10/24/building-data-pipelines-with-python-and-luigi/

代码:

痕迹:

文件“scrape.py”,第 24 行,在 luigi.run()

文件“/Users/-/.virtualenvs/adwords/lib/python2.7/site-packages/luigi/interface.py”,第 210 行,运行中 return _run(*args, **kwargs)['success']

文件“/Users/-/.virtualenvs/adwords/lib/python2.7/site-packages/luigi/interface.py”,第 238 行,在 _run return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)

文件“/Users/-/.virtualenvs/adwords/lib/python2.7/site-packages/luigi/interface.py”,第 194 行,在 _schedule_and_run success &= worker.add(t, env_params.parallel_scheduling)

文件“/Users/-/.virtualenvs/adwords/lib/python2.7/site-packages/luigi/worker.py”,第 565 行,在 self._add(item, is_complete) 中添加下一个:

文件“/Users/-/.virtualenvs/adwords/lib/python2.7/site-packages/luigi/worker.py”,第 682 行,在 _add retry_policy_dict=_get_retry_policy_dict(task),

_add_task self._scheduler.add_task(*args, **kwargs) 中的文件“/Users/-/.virtualenvs/adwords/lib/python2.7/site-packages/luigi/worker.py”,第 441 行

rpc_func 中的文件“/Users/-/.virtualenvs/adwords/lib/python2.7/site-packages/luigi/scheduler.py”,第 112 行 return self._request('/api/{}'.format(fn_name ),actual_args, **request_args)

_request response = json.loads(page)["response"] 中的文件“/Users/-/.virtualenvs/adwords/lib/python2.7/site-packages/luigi/rpc.py”,第 145 行

文件“/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/json/init .py ”,第 338 行,加载返回 _default_decoder.decode(s)

文件“/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/json/decoder.py”,第 366 行,在 decode obj 中,end = self.raw_decode(s, idx=_w(s , 0).end())

文件“/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/json/decoder.py”,第 384 行,在 raw_decode 中引发 ValueError(“无法解码 JSON 对象”)ValueError:否JSON对象可以被解码