0

我一直在尝试使用 Tweepy 和 Twitter 的 Streaming API 在 PostreSQL 数据库中填充表。我非常接近,我相信我离得到它只有一条线。我看过很多例子,包括: http ://andrewbrobinson.com/2011/07/15/using-tweepy-to-access-the-twitter-stream/ http://blog.creapptives.com/post/14062057061 /the-key-value-store-everyone-ignored-postgresql Python tweepy 写入 sqlite3 db tweepy 流到 sqlite 数据库 - 无效的 synatx 使用 tweepy 访问 Twitter 的 Streaming API

我可以很容易地使用 Tweepy 流式传输推文,所以我知道我的消费者密钥、消费者密钥、访问密钥和访问密钥是正确的。我还设置了 Postgres,并成功连接到我创建的数据库。我使用 .py 文件中的 psycopg2 将硬编码值测试到我的数据库中的表中,这也有效。我正在根据我选择的关键字获取推文,并成功连接到数据库中的表。现在我只需要将推文流式传输到我的 postgres 数据库中的表中。就像我说的,我是如此接近,任何帮助将非常感激。

这个精简的脚本将数据插入到我想要的表中:

import psycopg2

try:
    conn = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
    print "connected"
except:
    print "unable to connect"

namedict = (
    {"first_name":"Joshua", "last_name":"Drake"},
    {"first_name":"Steven", "last_name":"Foo"},
    {"first_name":"David", "last_name":"Bar"}
    )

cur = conn.cursor()

cur.executemany("""INSERT INTO testdata(first_name, last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict);

conn.commit()

下面是我已经编辑了一段时间的脚本,现在试图让它工作:

import psycopg2
import time
import json
from getpass import getpass
import tweepy

consumer_key = 'x'
consumer_secret = 'x'
access_key = 'x'
access_secret = 'x'

connection = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
cursor = connection.cursor()

#always use this step to begin clean
def reset_cursor():
    cursor = connection.cursor()

class StreamWatcherListener(tweepy.StreamListener):
    def on_data(self, data):
        try:
            print 'before cursor' + data
            connection = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
            cur = connection.cursor()
            print 'status is: ' + str(connection.status)
            #cur.execute("INSERT INTO tweet_list VALUES (%s)" % (data.text))
            cur.executemany("""INSERT INTO tweets(tweet) VALUES (%(text)s)""", data);
            connection.commit()
            print '---------'
            print type(data)
            #print data
        except Exception as e:
            connection.rollback()
            reset_cursor()
            print "not saving"
            return 
        if cursor.lastrowid == None:
            print "Unable to save"

    def on_error(self, status_code):
        print 'Error code = %s' % status_code
        return True

    def on_timeout(self):
        print 'timed out.....'

print 'welcome'
auth1 = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth1.set_access_token(access_key, access_secret)
api = tweepy.API(auth1)

l = StreamWatcherListener()
print 'about to stream'
stream = tweepy.Stream(auth = auth1, listener = l)

setTerms = ['microsoft']
#stream.sample()
stream.filter(track = setTerms)

对不起,如果代码有点乱,但一直在尝试很多选择。就像我说的那样,任何建议、有用示例的链接等都将不胜感激,因为我已经尝试了我能想到的一切,现在正在长途跋涉。万分感谢。

4

1 回答 1

1

好吧,我不确定你为什么要为此使用类,以及为什么你没有__init__在你的类中定义。看起来很复杂。

这是我用来做这些事情的函数的基本版本。我只使用过 sqlite,但语法看起来基本相同。也许你可以从中得到一些东西。

def retrieve_tweets(numtweets=10, *args):
    """
    This function optionally takes one or more arguments as keywords to filter tweets.
    It iterates through tweets from the stream that meet the given criteria and sends them 
    to the database population function on a per-instance basis, so as to avoid disaster 
    if the stream is disconnected.

    Both SampleStream and FilterStream methods access Twitter's stream of status elements.
    """   
    filters = []
    for key in args:
        filters.append(str(key))
    if len(filters) == 0:
        stream = tweetstream.SampleStream(username, password)  
    else:
        stream = tweetstream.FilterStream(username, password, track=filters)
    try:
        count = 0
        while count < numtweets:       
            for tweet in stream:
                # a check is needed on text as some "tweets" are actually just API operations
                # the language selection doesn't really work but it's better than nothing(?)
                if tweet.get('text') and tweet['user']['lang'] == 'en':   
                    if tweet['retweet_count'] == 0:
                        # bundle up the features I want and send them to the db population function
                        bundle = (tweet['id'], tweet['user']['screen_name'], tweet['retweet_count'], tweet['text'])
                        db_initpop(bundle)
                        break
                    else:
                        # a RT has a different structure.  This bundles the original tweet.  Getting  the
                        # retweets comes later, after the stream is de-accessed.
                        bundle = (tweet['retweeted_status']['id'], tweet['retweeted_status']['user']['screen_name'], \
                                  tweet['retweet_count'], tweet['retweeted_status']['text'])
                        db_initpop(bundle)
                        break
            count += 1
    except tweetstream.ConnectionError, e:
        print 'Disconnected from Twitter at '+time.strftime("%d %b %Y %H:%M:%S", time.localtime()) \
        +'.  Reason: ', e.reason

def db_initpop(bundle):
    """
    This function places basic tweet features in the database.  Note the placeholder values:
    these can act as a check to verify that no further expansion was available for that method.
    """
    #unpack the bundle 
    tweet_id, user_sn, retweet_count, tweet_text = bundle
    curs.execute("""INSERT INTO tblTweets VALUES (null,?,?,?,?,?,?)""", \
        (tweet_id, user_sn, retweet_count, tweet_text, 'cleaned text', 'cleaned retweet text'))
    conn.commit()
    print 'Database populated with tweet '+str(tweet_id)+' at '+time.strftime("%d %b %Y %H:%M:%S", time.localtime())

祝你好运!

于 2012-11-15T20:42:23.463 回答