问题标签 [luigi]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1858 浏览

python - 如何让我的 Luigi 调度程序使用带有并行调度标志的多个内核?

我的luigi.cfg文件中有以下行(在所有节点、调度程序和工作人员上):

然而,当我在我的 luigi 调度程序上监控 CPU 利用率时(大约 4000 个任务的图表,处理来自大约 100 个工作人员的请求),它只使用调度程序上的一个内核,luigid单线程通常达到 100% 的 CPU 利用率. 我的理解是这个配置变量应该并行化任务调度。

消息来源建议这个标志确实应该在调度程序上使用多个核心。在https://github.com/spotify/luigi/blob/master/luigi/interface.py#L194中,调用了https://github.com/spotify/luigi/blob/master/luigi/worker。 py#L498并行检查.complete()任务的状态。

为了让我的 Luigi 调度程序利用其所有内核,我缺少什么?

0 投票
1 回答
1722 浏览

luigi - Luigi 未能完成 require 方法中列出的所有任务

假设我有一个具有以下依赖结构的任务

子任务自己运行良好。父级正确检查所有子级任务的完成状态。然而,当第一个子任务完成时,调度程序将父任务标记为已完成。带有以下消息:

0 投票
1 回答
2679 浏览

python - 我可以将 luigi 与 Python celery 一起使用吗

我正在为我的网络应用程序使用芹菜。Celery 执行父任务,然后执行进一步的任务管道

芹菜的问题

  1. 我无法获得使用 luigi 获得的依赖关系图和可视化工具,以查看我的父任务的状态

  2. Celery 不提供重新启动失败管道并从失败处开始的机制。

这两件事我可以很容易地从 luigi 那里得到。

所以我在想,一旦 celery 运行父任务,然后在该任务中我执行 Luigi 管道。

是否会有任何问题,即我需要根据 queuesize 自动缩放芹菜工人。这会影响多台机器上的任何 luigi 工人吗?

0 投票
1 回答
1296 浏览

python - 是否可以在 luigi Python 中使用数据库行作为输出

我正在学习 luigi,看到大部分输出是文件系统上的文件。

如果该文件存在,则 luigi 认为该任务已完成。

在我的情况下,我不想写文件,而是想用 postgres 中的状态更新数据库 erecord DONE

我想知道在 luigi 中是否有可能

0 投票
1 回答
1672 浏览

python - Luigi 直接将文件写入 S3

我正在使用 Luigi 创建一个数据管道,并尝试将处理后的数据直接写入 S3 存储桶。我使用的代码是:

运行脚本后,出现错误:

我们可以直接将文件写入 S3 存储桶吗?

0 投票
1 回答
1816 浏览

python-3.x - 路易吉每天运行熊猫脚本?

我有一堆 python 文件,它们对各种数据源进行端到端的文件处理。

例如,survey.py 将使用 pandas 读取文件、添加列、重命名内容、进行一些计算,然后将修改后的文件保存回磁盘。

driver.py 将对该文件遵循相同的过程,等等。这种相同的结构正在发生在数十个文件中。

然后我有一个名为 process_all 的文件,它基本上只是按特定顺序运行每个 python 文件(一些文件依赖于其他文件)。经过进一步研究,我偶然发现了一个名为 luigi 的库,如果我需要扩展,它似乎可以更稳健地完成相同的任务。

问题:我是继续编写单独的 .py 文件来处理数据,还是将所有这些东西都放在我的 luigi 类中?我假设我要处理的每个原始文件都有一个类,对吗?

还是我写出 luigi 类中的所有处理步骤,比如下面的 CleanDriver 类?这似乎会变得非常冗长,因为很多这些单独的代码文件每个都是 20-50 行,而且有几十行。

0 投票
0 回答
289 浏览

python-2.7 - HiveThriftContext 的 get_partitions_by_filter 命令的语法是什么?

目标

我正在尝试检查配置单元表中是否存在部分指定的分区。

细节

我有一个带有两个分区键sourcedate的表。在执行任务之前,我需要检查是否存在某个分区datesource未指定)。

尝试

我可以使用 luigi 的内置 hive 分区目标和默认客户端轻松完成此操作:

但是默认客户端非常非常慢,因为它正在启动 hive 的命令行实例并运行查询。所以我尝试将默认客户端换成节俭的客户端,这发生了:

看起来这两个客户端对部分指定的分区的解释不同。

我已经编写了自己的客户端,它继承自 MetastoreClient 并添加了一些我过去需要的附加功能,所以我不介意添加我自己设计的部分指定的分区检查。看起来客户端具有我需要的功能:

看起来该命令get_partitions_by_filter可能完全符合我的要求,但除了自动生成的预期类型列表之外,我在任何地方都找不到任何文档。而且我在使用更简单的函数时遇到了类似的问题:当我完全指定我知道存在的分区时,我无法获取get_partitionget_partition_by_name找到它们。我确信这是因为我没有以正确的格式提供参数,但我不知道正确的格式是什么,而且我的耐心已经耗尽了猜测。

HiveThriftContext 的 get_partitions_by_filter 命令的语法是什么?

后续问题:您是如何解决这个问题的?

0 投票
1 回答
198 浏览

python - 在 luigi 中自动实例化?

luigi.Task.run中,我们需要将对象序列化为文件/数据库/等:

但为了方便起见,我想跳过pd.read_csv(...)这段代码,因为在重用任务时我必须编写相同的实例化步骤。

有没有像这样在 luigi 中实例化的自动方法?:

0 投票
2 回答
449 浏览

subprocess - 错误:luigi 中未捕获的异常(TypeError:必须是字符串或缓冲区,而不是无)

从 python 代码调用 /triggering Luigi Task 时遇到问题。

  • 基本上我需要像我们在命令行上一样触发一个 luigi 任务,但是来自 python 代码
  • 我正在使用 supbrocess.popen 使用 shell 命令调用 luigi 任务
  • 我有一个名为 test.py 的测试代码,并且在模块 task_scheduler.py 中有一个测试类,其中包含我的 luigi 任务(两个模块都在同一位置/目录中)

但我得到的错误是

谁能建议我在这里做错了什么?如果我使用 shell 提示符,命令“python -m luigi --module task_scheduler TestClass”可以完美运行

0 投票
2 回答
7372 浏览

python-2.7 - 如何从 Python Luigi 登录

我正在尝试建立一个从 Luigi 记录的策略,以便有一个可配置的输出列表,包括标准输出和自定义文件列表。我希望能够在运行时设置日志记录级别。我们的系统使用 Luigi 从 Jenkins 调用 spark。先感谢您。