我一直在尝试使用 Tweepy 和 Twitter 的 Streaming API 在 PostreSQL 数据库中填充表。我非常接近,我相信我离得到它只有一条线。我看过很多例子,包括: http ://andrewbrobinson.com/2011/07/15/using-tweepy-to-access-the-twitter-stream/ http://blog.creapptives.com/post/14062057061 /the-key-value-store-everyone-ignored-postgresql Python tweepy 写入 sqlite3 db tweepy 流到 sqlite 数据库 - 无效的 synatx 使用 tweepy 访问 Twitter 的 Streaming API 等
我可以很容易地使用 Tweepy 流式传输推文,所以我知道我的消费者密钥、消费者密钥、访问密钥和访问密钥是正确的。我还设置了 Postgres,并成功连接到我创建的数据库。我使用 .py 文件中的 psycopg2 将硬编码值测试到我的数据库中的表中,这也有效。我正在根据我选择的关键字获取推文,并成功连接到数据库中的表。现在我只需要将推文流式传输到我的 postgres 数据库中的表中。就像我说的,我是如此接近,任何帮助将非常感激。
这个精简的脚本将数据插入到我想要的表中:
import psycopg2
try:
conn = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
print "connected"
except:
print "unable to connect"
namedict = (
{"first_name":"Joshua", "last_name":"Drake"},
{"first_name":"Steven", "last_name":"Foo"},
{"first_name":"David", "last_name":"Bar"}
)
cur = conn.cursor()
cur.executemany("""INSERT INTO testdata(first_name, last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict);
conn.commit()
下面是我已经编辑了一段时间的脚本,现在试图让它工作:
import psycopg2
import time
import json
from getpass import getpass
import tweepy
consumer_key = 'x'
consumer_secret = 'x'
access_key = 'x'
access_secret = 'x'
connection = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
cursor = connection.cursor()
#always use this step to begin clean
def reset_cursor():
cursor = connection.cursor()
class StreamWatcherListener(tweepy.StreamListener):
def on_data(self, data):
try:
print 'before cursor' + data
connection = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
cur = connection.cursor()
print 'status is: ' + str(connection.status)
#cur.execute("INSERT INTO tweet_list VALUES (%s)" % (data.text))
cur.executemany("""INSERT INTO tweets(tweet) VALUES (%(text)s)""", data);
connection.commit()
print '---------'
print type(data)
#print data
except Exception as e:
connection.rollback()
reset_cursor()
print "not saving"
return
if cursor.lastrowid == None:
print "Unable to save"
def on_error(self, status_code):
print 'Error code = %s' % status_code
return True
def on_timeout(self):
print 'timed out.....'
print 'welcome'
auth1 = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth1.set_access_token(access_key, access_secret)
api = tweepy.API(auth1)
l = StreamWatcherListener()
print 'about to stream'
stream = tweepy.Stream(auth = auth1, listener = l)
setTerms = ['microsoft']
#stream.sample()
stream.filter(track = setTerms)
对不起,如果代码有点乱,但一直在尝试很多选择。就像我说的那样,任何建议、有用示例的链接等都将不胜感激,因为我已经尝试了我能想到的一切,现在正在长途跋涉。万分感谢。