3

Problem

Celery workers are hanging on task execution when using a package which accesses a ZEO server. However, if I were to access the server directly within tasks.py, there's no problem at all.

Background

I have a program that reads and writes to a ZODB file. Because I want multiple users to be able to access and modify this database concurrently, I have it managed by a ZEO server, which should make it safe across multiple processes and threads. I define the database within a module of my program:

from ZEO import ClientStorage
from ZODB.DB import DB

addr = 'localhost', 8090
storage = ClientStorage.ClientStorage(addr, wait=False)
db = DB(storage)

SSCCE

I'm obviously attempting more complex operations, but let's assume I only want the keys of a root object, or its children. I can produce the problem in this context.

I create dummy_package with the above code in a module, databases.py, and a bare-bones module meant to perform database access:

# main.py

def get_keys(dict_like):
    return dict_like.keys()

If I don't try any database access with dummy_package, I can import the database and access root without issue:

# tasks.py
from dummy_package import databases

@task()
def simple_task():

    connection = databases.db.open()
    keys = connection.root().keys()
    connection.close(); databases.db.close()
    return keys  # Works perfectly

However, trying to pass a connection or a child of root makes the task hang indefinitely.

@task()
def simple_task():
    connection = databases.db.open()
    root = connection.root()
    ret = main.get_keys(root)  # Hangs indefinitely
    ...

If it makes any difference, these Celery tasks are accessed by Django.

Question

So, first of all, what's going on here? Is there some sort of race condition caused by accessing the ZEO server in this way?

I could make all database access Celery's responsibility, but that will make for ugly code. Furthermore, it would ruin my program's ability to function as a standalone program. Is it not possible to interact with ZEO within a routine called by a Celery worker?

4

2 回答 2

2

Do not save an open connection or its root object as a global.

You need a connection per-thread; just because ZEO makes it possible for multiple threads to access, it sounds like you are using something that is not thread-local (e.g. module-level global in databases.py).

Save the db as a global, but call db.open() during each task. See http://zodb.readthedocs.org/en/latest/api.html#connection-pool

于 2013-06-21T14:37:59.553 回答
0

I don't completely understand what's going on, but I'm thinking the deadlock has something to do with the fact that Celery uses multiprocessing by default for concurrency. Switching over to using Eventlet for tasks that need to access the ZEO server solved my problem.

My process

  1. Start up a worker that uses Eventlet, and one that uses standard multiproccesing.

    celery is the name of the default queue (for historical reasons), so have the Eventlet worker handle this queue:

    $ celery worker --concurrency=500 --pool=eventlet --loglevel=debug \ 
                     -Q celery                --hostname eventlet_worker
    $ celery worker  --loglevel=debug \
                     -Q multiprocessing_queue --hostname multiprocessing_worker
    
  2. Route tasks which need standard multiprocessing to the appropriate queue. All others will be routed to the celery queue (Eventlet-managed) by default. (If using Django, this goes in settings.py):

    CELERY_ROUTES = {'project.tasks.ex_task': {'queue': 'multiprocessing_queue'}}
    
于 2013-07-01T16:33:27.253 回答