0

我有 python 代码,它需要一堆任务并将它们分发到集群上的不同线程或不同节点。我总是最终编写一个主脚本driver.py,它需要两个命令行参数:--run-all--run-task. 第一个只是一个包装器,它遍历所有任务,然后调用driver.py --run-task每个任务作为参数传递。例子:

== driver.py ==
# Determine the current script
DRIVER = os.path.abspath(__file__)
(opts, args) = parser.parse_args()
if opts.run_all is not None:
  # Run all tasks
   for task in opts.run_all.split(","):
     # Call driver.py again with a specific task
     cmd = "python %s --run-task %s" %(DRIVER, task)
     # Execute on system
     distribute_cmd(cmd)
elif opts.run_task is not None:
  # Run on an individual task
  # code here for processing a task...

然后用户会调用:

$ driver.py --run-all task1,task2,task3,task4

每个任务都会被分发。

该函数distribute_cmd接受一个 shell 可执行命令,并以系统特定的方式发送到节点或线程。之所以driver.py要找到自己的名字并调用自己是因为distribute_cmd需要一个可执行的shell命令;例如,它不能采用函数名称。

这种考虑导致我设计了一个具有两种模式并且必须调用自身的驱动程序脚本。这有两个复杂性:(1)脚本必须找到自己的路径 via__file__和(2)在将其制作成 Python 包时,不清楚driver.py应该去哪里。它应该是一个可执行脚本,但如果我把它放在setup.py'sscripts=中,那么我将不得不找出脚本所在的位置(请参阅从 Python distutils 中的 setup.py 查找脚本目录的正确方法?)。这似乎不是一个好的解决方案。

有什么替代设计?请记住,任务的分配必须产生一个可执行的命令,该命令可以作为字符串传递给distribute_cmd. 谢谢。

4

1 回答 1

1
  • 您正在寻找的是一个已经完全满足您需要的库,例如FabricCelery.
  • 如果您不使用节点,我建议您使用multiprocessing.
  • 这是一个与这个稍微相似的问题

为了能够远程执行,您需要:

  • ssh访问该框,在这种情况下,您可以使用它Fabric来发送您的命令。
  • 服务器SocketServer、tcp 服务器或任何可以接受连接的服务器。
  • 将等待数据的代理或客户端,如果您使用代理,您也可以使用代理来处理您的消息。Celery允许您做一些管道,一个结束puts消息在队列中,而另一端gets消息来自队列。如果消息是要执行的命令,那么代理可以进行os.system()调用,或者调用subprocess.Popen()

芹菜示例:

 import os
 from celery import Celery
 celery = Celery('tasks', broker='amqp://guest@localhost//')
 @celery.task
 def run_command(command):
    return os.system(command)

然后,您将需要一个绑定在队列上并等待任务执行的工作人员。文档中的更多信息

面料示例:

编码:

from fabric.api import run
def exec_remotely(command):
   run(command)

调用:

$ fab exec_remotely:command='ls -lh'

文档中的更多信息

批处理系统案例: 回到问题...

  • distribute_cmd是会打电话的东西bsub somescript.sh
  • 您只需要查找文件,因为您将使用其他参数重新执行相同的脚本
  • 由于上述原因,您可能无法提供正确的distutils脚本。

让我们质疑这个设计。

  • 为什么需要使用相同的脚本?
  • 您的驱动程序可以编写脚本然后调用 bsub 吗?
  • 可以使用临时文件吗?
  • 所有节点实际上共享一个文件系统吗?
  • 你怎么知道文件将存在于节点上?

例子:

TASK_CODE = {
   'TASK1': '''#!/usr/bin/env python
#... actual code for task1 goes here ...
''',
   'TASK2': '''#!/usr/bin/env python
#... actual code for task2 goes here ...
'''}
# driver portion
(opts, args) = parser.parse_args()
if opts.run_all is not None:
   for task in opts.run_all.split(","):
      task_path = '/tmp/taskfile_%s' % task
      with open(task_path, 'w') as task_file:
         task_file.write(TASK_CODE[task])
      # note: should probably do better error handling.
      distribute_cmd(task_path)
于 2012-10-20T17:39:20.240 回答