1

我正在创建一个任务(通过子类化 celery.task.Task)来创建与 Twitter 的流 API 的连接。对于 Twitter API 调用,我使用的是 tweepy。正如我从 celery-documentation 中读到的,“任务不会针对每个请求进行实例化,而是在任务注册表中注册为全局实例。” 我期待每当我为任务调用 apply_async (或延迟)时,我将访问最初实例化但不会发生的任务。相反,会创建自定义任务类的新实例。我需要能够访问原始自定义任务,因为这是我可以终止由 tweepy API 调用创建的原始连接的唯一方法。

如果这有帮助,这里有一些代码:

from celery import registry
from celery.task import Task

class FollowAllTwitterIDs(Task):
    def __init__(self):
        # requirements for creation of the customstream
        # goes here. The CustomStream class is a subclass
        # of tweepy.streaming.Stream class

        self._customstream = CustomStream(*args, **kwargs)

    @property
    def customstream(self):
        if self._customstream:
            # terminate existing connection to Twitter
            self._customstream.running = False
        self._customstream = CustomStream(*args, **kwargs)

    def run(self):
        self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()

        self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]

对于 Django 视图

def connect_to_twitter(request):
    if request.method == 'POST':
        do_stuff_here()
        .
        .
        .

        follow_all_twitterids.apply_async(args=[], kwargs={})

     return

任何帮助,将不胜感激。:D

编辑:

对于问题的其他上下文,只要调用 filter() 方法,CustomStream 对象就会创建一个 httplib.HTTPSConnection 实例。每当有另一次尝试创建连接时,都需要关闭此连接。通过将 customstream.running 设置为 False 来关闭连接。

4

1 回答 1

0

任务应该只实例化一次,如果你认为不是出于某种原因,我建议你添加一个

print("INSTANTIATE") 导入回溯 traceback.print_stack()

Task.__init__方法,所以你可以知道这会发生在哪里。

我认为您的任务可以更好地表达如下:

from celery.task import Task, task

class TwitterTask(Task):
    _stream = None
    abstract = True

    def __call__(self, *args, **kwargs):
        try:
            return super(TwitterTask, self).__call__(stream, *args, **kwargs)
        finally:
            if self._stream:
                self._stream.running = False

    @property
    def stream(self):
        if self._stream is None:
            self._stream = CustomStream()
        return self._stream

@task(base=TwitterTask)
def follow_all_ids():
    ids = get_list_of_ids_to_follow()
    follow_all_ids.stream.filter(follow=ids, async=false)
于 2011-10-26T15:47:29.403 回答