0

问题已解决,请参阅帖子末尾的解决方案

我需要帮助来估计我的 tweepy 程序调用带有位置过滤器的 Twitter Stream API 的运行时间。

在我启动它之后,它已经运行了 20 多分钟,这比我预期的要长。我是 Twitter Stream API 的新手,并且只使用了 REST API 几天。在我看来,REST API 会在几秒钟内给我 50 条推文,很简单。但是这个 Stream 请求需要更多时间。我的程序没有死在我身上或出现任何错误。所以不知道是不是有什么问题。如果有,请指出。

总之,如果您认为我的代码是正确的,您能否提供运行时间的估计?如果您认为我的代码有误,您能帮我修复它吗?

先感谢您!

这是代码:

# Import Tweepy, sys, sleep, credentials.py
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

box = [-86.33,41.63,-86.20,41.74]

class CustomStreamListener(tweepy.StreamListener):
    def on_error(self, status_code):
        print >> sys.stderr, 'Encountered error with status code:', status_code
        return True # Don't kill the stream
    def on_timeout(self):
        print >> sys.stderr, 'Timeout...'
        return True # Don't kill the stream

stream = tweepy.streaming.Stream(auth, CustomStreamListener()).filter(locations=box).items(50)
stream

我尝试了来自http://docs.tweepy.org/en/v3.4.0/auth_tutorial.html#auth-tutorial的方法显然它对我不起作用......这是我的代码如下。您介意提供任何意见吗?如果您有一些工作代码,请告诉我。谢谢!

# Import Tweepy, sys, sleep, credentials.py
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

# Assign coordinates to the variable
box = [-74.0,40.73,-73.0,41.73]

import tweepy
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        print(status.text)
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_data disconnects the stream
            return False

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
myStream.filter(track=['python'], locations=(box), async=True)

这是错误消息:

Traceback (most recent call last):
  File "test.py", line 26, in <module>
    myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
TypeError: 'MyStreamListener' object is not callable

问题解决了!请参阅下面的解决方案

经过另一轮调试,以下是可能对同一主题感兴趣的人的解决方案:

# Import Tweepy, sys, sleep, credentials.py
try:
    import json
except ImportError:
    import simplejson as json
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

# Assign coordinates to the variable
box = [-74.0,40.73,-73.0,41.73]

import tweepy
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        print(status.text.encode('utf-8'))
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_data disconnects the stream
            return False

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(api.auth, listener=myStreamListener)
myStream.filter(track=['NYC'], locations=(box), async=True)
4

1 回答 1

2

核心问题:我认为您误解了 Stream 的含义。

Tl; dr:您的代码正在运行,您只是没有对返回的数据做任何事情。

其余 API 调用是对信息的单个调用。您提出请求,Twitter 会发回一些信息,这些信息会分配给您的变量。

Tweepy中的 StreamObject(您已创建为stream)使用您的搜索参数打开与 twitter 的连接,而 Twitter 则将 Tweets 流式传输到它。永远。

来自 Tweepy 文档:

流 api 与 REST api 完全不同,因为 REST api 用于从 twitter 中提取数据,但流 api 将消息推送到持久会话。与使用 REST API 相比,这允许流式 API 实时下载更多数据。

因此,您需要构建一个处理程序(streamListener在 tweepy 的术语中是 ),就像打印出推文的这个处理程序一样。.

额外的

来自痛苦经验的警告 - 如果您要尝试将推文保存到数据库:Twitter 可以并且将会以比您将它们保存到数据库的速度更快的速度将对象流式传输给您。这将导致您的 Stream 断开连接,因为推文在 Twitter 上备份,并且超过一定程度的备份(不是实际短语),它们只会断开您的连接。

我通过使用django-rq将保存的作业放入作业队列来处理这个问题 - 这样,我可以每秒处理数百条推文(在高峰期),它会变得平滑。你可以在下面看到我是如何做到的。如果您不使用 django 作为框架,Python-rq 也可以使用。该read both方法只是一个从推文中读取并将其保存到 postgres 数据库的函数。在我的具体情况下,我通过 Django ORM 使用该django_rq.enqueue函数来做到这一点。

__author__ = 'iamwithnail'

from django.core.management.base import BaseCommand, CommandError
from django.db.utils import DataError
from harvester.tools import read_both
import django_rq

class Command(BaseCommand):

    args = '<search_string search_string>'
    help = "Opens a listener to the Twitter stream, and tracks the given string or list" \
           "of strings, saving them down to the DB as they are received."


    def handle(self, *args, **options):
        try:
            import urllib3.contrib.pyopenssl
            urllib3.contrib.pyopenssl.inject_into_urllib3()
        except ImportError:
            pass

        consumer_key = '***'
        consumer_secret = '****'
        access_token='****'
        access_token_secret_var='****'
        import tweepy
        import json

        # This is the listener, responsible for receiving data
        class StdOutListener(tweepy.StreamListener):
            def on_data(self, data):
                decoded = json.loads(data)
                try:
                    if decoded['lang'] == 'en':
                        django_rq.enqueue(read_both, decoded)
                    else:
                        pass
                except KeyError,e:
                    print "Error on Key", e
                except DataError, e:
                    print "DataError", e
                return True


            def on_error(self, status):
                print status


        l = StdOutListener()
        auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
        auth.set_access_token(access_token, access_token_secret_var)
        stream = tweepy.Stream(auth, l)
stream.filter(track=args)

编辑:您的后续问题是由错误地调用侦听器引起的。

myStreamListener = MyStreamListener() #creates an instance of your class

你有这个:

myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())

当您使用(). 所以应该是:

myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)

事实上,也许可以更简洁地写成:

myStream = tweepy.Stream(api.auth,myStreamListener)
于 2017-03-10T09:50:34.763 回答