1

I'm new to dramatiq an experimenting for a couple of days and I'm not getting a worker to work within a (simple) script.

test.py


import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results
from dramatiq.worker import Worker

redis_broker = RedisBroker(host="127.0.0.1", port=6379)

results_backend = RedisBackend(url="redis://127.0.0.1:6379")
redis_broker.add_middleware(Results(backend=results_backend))

dramatiq.set_broker(redis_broker)

worker = Worker(
    broker=redis_broker
)
worker.start()

@dramatiq.actor(queue_name="default", max_retries=1, store_results=True)
def print_hello_world():
    print("Hello World!")

print_hello_world.send()

result (Redis):

127.0.0.1:6379> keys *
1) "dramatiq:default.msgs"
2) "dramatiq:default"
3) "dramatiq:__heartbeats__"

But when starting a dramatiq deamon from the folder with test.py:

$ dramatiq test

the result is what I expected

result (Redis)

127.0.0.1:6379> keys *
1) "3f11649148820d957b2945da46b3c2b7"
2) "dramatiq:__heartbeats__"

It seems that the worker is not receiving the message in this way. It's hard to find examples on the internet setting up a worker from within a script.

Is there some one who can help my getting this work?

4

1 回答 1

0

您可以从此链接参考https://gitlab.com/bersace/flask-dramatiq/-/blob/master/flask_dramatiq.py#L311

"""
Refer from https://gitlab.com/bersace/flask-dramatiq/-/blob/master/flask_dramatiq.py#L311
"""
import os
import sys

from dramatiq.cli import (
    CPUS,
    HAS_WATCHDOG,
    main as dramatiq_worker,
    make_argument_parser as dramatiq_argument_parser,
)

from app.core.config import get_settings
from app.tasks import rabbitmq_broker
from app.tasks.manual_selenium import demo
from app.utils.logger import logger


def list_managed_actors(broker, queues):
    queues = set(queues)
    all_actors = broker.actors.values()
    if not queues:
        return all_actors
    else:
        return [a for a in all_actors if a.queue_name in queues]


def format_actor(actor):
    return "%s@%s" % (actor.actor_name, actor.queue_name)


def guess_code_directory(broker):
    actor = next(iter(broker.actors.values()))
    modname, *_ = actor.fn.__module__.partition('.')
    mod = sys.modules[modname]
    return os.path.dirname(mod.__file__)


def worker(verbose=0, processes=CPUS, threads=8, queues=None, broker=rabbitmq_broker):
    """Run dramatiq workers.

    Setup Dramatiq with broker and task modules from Flask app.

    \b
    examples:
      # Run dramatiq with 1 thread per process.
      $ flask worker --threads 1

    \b
      # Listen only to the "foo" and "bar" queues.
      $ flask worker -Q foo,bar

    \b
      # Consuming from a specific broker
      $ flask worker mybroker
    """
    # Plugin for flask.commands entrypoint.
    #
    # Wraps dramatiq worker CLI in a Flask command. This is private API of
    # dramatiq.
    # TODO Plugin for fastapi

    parser = dramatiq_argument_parser()

    command = [
        "--processes", str(processes),
        "--threads", str(threads),
        # This module does not have broker local. Thus dramatiq fallbacks to
        # global broker.
        __name__,
    ]

    if get_settings().DEBUG:
        verbose = max(1, verbose)
        if HAS_WATCHDOG:
            command += ["--watch", guess_code_directory(broker)]

    queues = queues.split(",") if queues else []
    if queues:
        command += ["--queues"] + queues
    command += verbose * ['-v']
    args = parser.parse_args(command)

    logger.info("Able to execute the following actors:")
    for actor in list_managed_actors(broker, queues):
        logger.info("\t%s." % format_actor(actor))

    dramatiq_worker(args)


if '__main__' == __name__:
    worker()


于 2021-10-15T06:42:07.493 回答