1

我正在使用带有 python 的应用程序引擎

我在价值列表上每天都有任务要做。这些值存储在目标属性下的“项目”值中:

myproject1.targets=['foo','bar','foo2','bar2','foo3','bar3','foo4','bar4','foo5','bar5']

我的目标是将对 url 的调用排入队列:url_to_my_worker对于每个值,将值作为参数。

我的数据库中目前只有一个项目对象。

我运行 schedule_daily_projects_tasks 基本上为每个配置文件对象排队 schedule_daily_profile_tasks

class schedule_daily_projects_tasks(webapp.RequestHandler):
    def post(self):
        key=self.request.get('key')
        pro=project.get(key)
        profiles=my_profile.gql("WHERE project=:1",pro)
        logging.info(profiles)
        for profile in profiles:
            taskqueue.add(url='/control/schedule_daily_profile_tasks', params={'key': profile.key()})

然后为每个配置文件运行“schedule_daily_profile_tasks”。

class schedule_daily_profile_tasks(webapp.RequestHandler):
    def post(self):
        key=self.request.get('key')
        profile=my_profile.get(key)
        pro=profile.project
        for i in range(1, 6):
            now=datetime.now()
            tim=datetime(year=now.year, month=now.month, day=now.day, hour=8+i)
            screen_name=pro.targets.pop()
            taskqueue.add(url='/url_to_my_worker', params={'profk': key, 'screen_name':screen_name}, eta=tim)
            pro.put()

假设我的数据库中有 5 个配置文件对象:profile1 到 profile5 所以如果一切顺利,我应该有 5 个任务排队到 url '/url_to_my_worker',参数:

1) params={'profk': profile1.key(), 'screen_name':'bar5'}
2) params={'profk': profile2.key(), 'screen_name':'foo5'}
3) params={'profk': profile3.key(), 'screen_name':'bar4'}
4) params={'profk': profile4.key(), 'screen_name':'foo4'}
5) params={'profk': profile5.key(), 'screen_name':'bar3'}

但相反,我得到:

1) params={'profk': profile1.key(), 'screen_name':'bar5'}
2) params={'profk': profile2.key(), 'screen_name':'bar5'}
3) params={'profk': profile3.key(), 'screen_name':'bar5'}
4) params={'profk': profile4.key(), 'screen_name':'bar5'}
5) params={'profk': profile5.key(), 'screen_name':'bar5'}

我相信任务运行得太快了,所以 n°2 在 n°1 “弹出”之前就开始了。因此 myproject1.targets 具有相同的值。

如何确保列表中的每个值都使用一次且仅使用一次?

非常感谢

4

4 回答 4

2

您遇到的问题或多或少与您描述的一样:您已将多个任务排入队列,这些任务试图同时修改同一个数据存储对象。由于您不使用事务,因此多个任务最终会检索相同的数据,执行相同的操作,然后覆盖彼此的结果。

您可以使用数据存储事务来避免这种情况,但更好的解决方案是重组您的任务,以便只有一个任务正在修改每个数据存储实体。这样,您就无需担心同步或事务问题。

于 2012-07-16T03:12:40.640 回答
1

您可能需要考虑将任务作为批处理添加:

targets=pro.targets
tasks=[]
...
  screen_name=targets.pop()
  tasks.append(taskqueue.Task(url='/url_to_my_worker', params={'profk': key, 'screen_name':screen_name}, eta=tim))
...
pro.put()
taskqueue.Queue().add(tasks)

请注意,您可以通过创建将 max_concurrent_requests 指定为 1 的 queue.yaml 来避免特定队列上的任务同时运行:

queue:
- name: default
  max_concurrent_requests: 1
于 2012-07-16T00:30:31.307 回答
0

Python 带有一个内置的线程安全队列库模块。

http://docs.python.org/library/queue.html

Queue 模块实现了多生产者、多消费者队列。当必须在多个线程之间安全地交换信息时,它在线程编程中特别有用。该模块中的 Queue 类实现了所有必需的锁定语义。这取决于 Python 中线程支持的可用性;请参阅线程模块。

您可以这样做,以便在调用 pop() 时,它只是阻塞,直到任何锁被释放。

编辑:链接的示例页面使用队列模块来完成它的工作。

import Queue
targets = ['foo','bar','foo2','bar2','foo3','bar3','foo4','bar4','foo5','bar5']
queue = Queue.LifoQueue() # Last in first out
for target in targets:
    queue.put(target)
myproject1.targets = queue
##########################
class schedule_daily_profile_tasks(webapp.RequestHandler):
    ....
    screen_name=pro.targets.get(block=true)
于 2012-07-15T17:24:22.910 回答
0

如果您的项目记录中没有大量的配置文件元素:Schedule_daily_projects_tasks() 可以将您的所有配置文件元素连接到序列化字符串中。将该字符串作为参数传递给任务队列。每个任务进程从字符串中弹出一个元素,对其进行处理,如果没有发生异常,则使用较短的序列化字符串将任务排入队列。如果您正在对同一实体进行更新,请以两秒左右的时间延迟将任务排入队列。这也将有助于调节为处理任务队列而启动的实例数量,从而降低成本。

于 2012-07-16T22:49:25.010 回答