问题标签 [luigi]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1411 浏览

postgresql - 使用 luigi 更新 Postgres 表

我刚开始使用luigi图书馆。我经常抓取网站并将任何新记录插入 Postgres 数据库。当我试图重写部分脚本以使用luigi时,我不清楚应该如何使用“标记表”

工作流程

  1. 抓取数据
  2. 查询数据库以检查新数据是否与旧数据不同。
  3. 如果是这样,请将新数据存储在同一个表中。

但是,使用 luigi'spostgres.CopyToTable时,如果表已经存在,则不会插入新数据。我想我应该使用表inserted中的列table_updates来确定应该插入哪些新数据,但我不清楚这个过程是什么样的,我在网上找不到任何明确的例子。

0 投票
0 回答
52 浏览

python-2.7 - IPython Tasks 数据库有哪些功能?

看看文档和谷歌: https ://ipyparallel.readthedocs.io/en/latest/db.html

功能的例子很少,例如:

有 Luigi 和 Airflow,但它们并不管理每个任务的代码源。IPython 任务管理器能否管理任务的调度以及代码源?

0 投票
1 回答
1331 浏览

python-3.x - 具有多个输入的 luigi 任务架构

我有许多泡菜文件,一个用于 2005 年到 2010 年之间的每个日期。每个文件都包含一个单词字典,其中包含该日期各自的频率。我还有一个“主文件”,其中包含整个期间的所有唯一单词。总共大约有500万字。

我需要获取所有这些数据并为每个单词生成一个 CSV 文件,每个日期将有一行。例如,例如文件some_word.txt

我在用 luigi 框架组织这个过程时遇到了麻烦。我当前的顶级任务需要一个单词,查找每个日期的相关频率并将结果存储在 CSV 文件中。我想我可以遍历我的主文件中的每个单词并使用该单词运行任务,但我估计这需要几个月甚至更长的时间。这是我简化版本的顶级AggregateTokenFreqs任务。

0 投票
1 回答
331 浏览

python - 全局变量在 Python Luigi Pipeline 中恢复为默认值

我有看起来像这样的python luigi管道代码。当我检查终端时,似乎程序调用了 A 类并设置了 dosHave = false,但在它运行 B 类之前,dosHave 恢复为真。

基本上,我想做'bar',但它一直在做'foo',因为全局变量不断恢复为true。

谁能解释为什么会这样?我几乎可以肯定这与 Luigi Pipeline 有关,而不是 Python 本身。

0 投票
1 回答
321 浏览

python - 我们可以限制 luigi 任务的吞吐量吗?

我们有一个 Luigi 任务,它从 3rd 方服务请求一条信息。我们限制了每分钟可以对该 API 调用执行的调用请求数。

有没有办法在每个任务的基础上指定调度程序每单位时间必须运行多少此类任务?

0 投票
0 回答
133 浏览

apache-spark - 在运行 SparkSubmitTask 之前删除 HdfsTarget

社区,我想HdfsTarget在运行SparkSubmitTask. 最佳做法是什么?到目前为止,我尝试了附加代码中提到的两个选项,但均未成功:

  1. HdfsTarget如果已经存在,则相关/必需的作业不会被执行
  2. 如果调用,任务将并行执行yield
0 投票
2 回答
939 浏览

python-3.5 - Luigi 任务方法执行顺序

Luigi 执行方法的顺序是什么(运行、输出、要求)。我知道需要运行作为检查任务 DAG 有效性的第一个检查,但不应该在 run() 之后运行输出?

我实际上是在尝试等待运行中的 kafka 消息,并基于该消息触发一堆其他任务并返回 LocalTarget。像这样:

但是,我收到一条错误消息:

例外:必须设置路径或 is_tmp

返回 LocalTarget(self.path)行。为什么 luigi 尝试执行 def output() 方法直到 def run() 完成?

0 投票
1 回答
1104 浏览

python - SQLAlchemy 无法将时间戳插入 MSSQL

sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', '[23000] [FreeTDS][SQL Server]无法将显式值插入时间戳列。将 INSERT 与列列表一起使用以排除时间戳列,或在时间戳列中插入一个 D EFAULT。(273) (SQLExecDirectW)') [SQL: 'INSERT INTO task_events (task_id, event_name, ts) OUTP UT inserted.id VALUES (?, ?, ?)'] [参数:( 54, 'RUNNING', datetime.datetime(2017, 1, 16, 18, 5, 55, 595066))]

该错误似乎与此有关:

You can't insert explicit values into a timestamp column. Timestamp values are unique binary numbers that are automatically generated.

SQLAlchemy 不知道这一点吗?是否有一些关键\设置让 SQLAlchemy 知道现在插入时间戳?

似乎是处理 SQLAlchemy 的 luigi 代码。

附言。这里的问题是 luigi 正在使用 SQLAlchemy,我不想修改 luigi 的代码,但是如果你可以在 luigi 方面提出一个简单的解决方案,那也可以。

0 投票
0 回答
136 浏览

shell - 将多个参数传递给管道中的外部程序

我正在尝试为 NGS 数据构建管道。

我制作了一个用于将命令传递给 shell 的小示例管道。示例管道有两个从 shell 调用的脚本,它们只是在许多数据帧(本例中为 10 个)中连接(sumtool.py)和乘以(multool.py)值。我的 wrapper(wrapper.py) 处理输入并按顺序传递运行脚本的命令。这是包装器中代码的相关部分:

这工作正常,但我希望能够一次为第一个脚本传递所有命令,所有数据帧等待它完成,然后为每个数据帧一次运行所有命令的第二个脚本。由于 Popen().wait() 等待每个命令需要更长的时间。

我尝试将 luigi 合并为一个解决方案,但我没有成功运行外部程序或尝试使用 luigi 传递多个 I/O。对此的任何提示表示赞赏。

我想象的另一个解决方案是一次单独传递样本,但我不确定如何将它放入 python(或任何其他语言)。这也将解决 luigi 的 I/O 问题。

谢谢

注意 1:这是我构建的一个小示例管道。我的主要目的是在管道中调用诸如 bwa、picard 之类的程序……我无法导入。

注意 2:我已经在子进程中使用 Popen。您可以在第 4 行和第 5 行之间找到它。

0 投票
0 回答
741 浏览

python - 多进程 Luigi 任务中的请求

我有一个简单的Luigi Elasticsearch 索引任务,它使用请求进行 GET并将响应推送到本地 ElasticSearch。另外,我做了第二个任务,调用了第一个任务,如下所示:

如果我在单个线程中运行 ManyRequests 任务,它工作正常。但是,如果我指定了多个工作人员(例如 --workers 4),进程将从 Elasticsearch 引发TransportError (index_already_exists_exception),并且它们不会正确完成。完成进程的数量是随机的,所以我认为这是由于 Elasticsearch 数据库中写入的一些冲突造成的。我必须以不同的方式实现 ManyRequests 吗?

任何帮助将不胜感激:)

这是我执行 ManyRequests --workers 4 时的控制台: