2

我正在尝试将 Huey 文档中建议的代码组织实施到现有应用程序中,并遵循简单的示例。目标是构建一个每天凌晨 3:00 运行任务的 crontab。

我打开了两个终端选项卡,第一个是消费者运行示例中的脚本:

PYTHONPATH=.:$PYTHONPATH
export WORKER_CLASS=${1:-thread}
huey_consumer.py main.huey --workers=4 -k $WORKER_CLASS -C -S

然后,在另一个选项卡中,我运行 main.py 脚本:

python main.py

配置文件

from huey import SqliteHuey

huey = SqliteHuey(filename='/tmp/huey.db')

任务.py

from config import huey

# Note that this time is 1 minute before whenever I'm debugging.
# I'm using the 3 am example as what we're aiming for in our final product.
@huey.periodic_task(crontab(minute='0', hour='3'))
def run_this_function():
    system = New_Class() # Class instantiation since there's a bunch that happens when a new class is set up 
    system.run_method # Runs a bunch of methods from the class in one location

主文件

from config import huey
from tasks import run_this_function

def main:
    run_this_function()

if __name__ == "__main__":
    main()

该任务立即运行,并且由于我对 Huey 是全新的,不确定我可能会缺少什么以使其按计划工作。我尝试了很多 crontab 组合,不确定是否存在挑战,或者我如何run_this_functionmain方法中调用。任何帮助表示赞赏!

4

1 回答 1

1

因此,首先,您实际上并不想调用run_this_function()自己,因此无需main.py在另一个选项卡中运行您的脚本。您只需要让 huey 消费者运行,因为您希望它负责在请求的时间执行任务。

您的消费者需要能够发现任务,您可以通过将其导入到该主文件中来执行此操作,然后通过该文件启动 Huey 实例(您也在执行此操作)。一个简单的测试可能是将打印语句放在与您定义周期性任务的文件相同的文件中。运行消费者后,您应该会看到您的打印语句。如果没有,您的任务也不会被消费者接走。

其次,我建议不要使用该crontab功能。我过去无法让它工作,相反,您最好编写自己的函数来配置凌晨 3 点。Huey 使用当前时间戳定期调用提供的函数。因此,作为一个可重用的示例,您可以执行以下操作:

def run_at_my_preferred_hour(hour_to_run):
    last_day_it_ran_for = None

    def check_if_task_needs_to_run_for_this_time(current_timestamp):
        if current_timestamp.hour == hour_to_run and current_timestamp.day != last_day_it_ran_for:
            # Save that you already ran the task for this day
            last_day_it_ran_for = current_timestamp.day
            return True
        return False

    return check_if_task_needs_to_run_for_this_time

    
@huey.periodic_task(run_at_my_preferred_hour(3))
def run_this_function():
   ...
于 2020-09-10T23:14:52.273 回答