8

背景:python设置了一个模块来从流式 API 中获取 JSON 对象,并使用 pymongo 将它们(一次批量插入 25 个)存储在 MongoDB 中。为了比较,我还有一个 bash 命令可以curl从同一个流 APIpipemongoimport. 这两种方法都将数据存储在单独的集合中。

我会定期监控这些count()收藏品以检查它们的表现。

到目前为止,我看到该python模块落后于该curl | mongoimport方法大约 1000 个 JSON 对象。

问题: 如何优化我的python模块以与 ? 同步 curl | mongoimport

我无法使用tweetstream,因为我使用的不是 Twitter API,而是第 3 方流媒体服务。

有人可以帮我吗?

Python模块:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

谢谢阅读。

4

2 回答 2

3

最初您的代码中有一个错误。

                if self.chunk_count % 50 == 0
                    self.raw_tweets.insert(self.tweet_list)
                    self.chunk_count = 0

您重置了 chunk_count,但没有重置 tweet_list。因此,您第二次尝试插入 100 个项目(50 个新项目加上 50 个之前已发送到 DB 的项目)。您已解决此问题,但仍然看到性能差异。

整个批量大小的事情结果是一个红鲱鱼。我尝试使用一个大的 json 文件并通过 python 加载它,而不是通过 mongoimport 加载它,Python 总是更快(即使在安全模式下 - 见下文)。

仔细查看您的代码,我意识到问题在于流 API 实际上是以块的形式向您传递数据。您应该只获取这些块并将它们放入数据库中(这就是 mongoimport 正在做的事情)。您的 python 为拆分流、将其添加到列表然后定期向 Mongo 发送批次所做的额外工作可能是我看到的和您看到的之间的区别。

为您的 handle_data() 尝试此代码段

def handle_data(self, data):
    try:
        string_buffer = StringIO(data)
        tweets = json.load(string_buffer)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)
    try:
        self.raw_tweets.insert(tweets)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)

需要注意的一件事是您的python 插入没有在“安全模式”下运行- 您应该通过safe=True在插入语句中添加一个参数来更改它。然后,您将在任何插入失败时获得异常,并且您的 try/catch 将打印暴露问题的错误。

它的性能成本也不高——我目前正在运行一个测试,大约五分钟后,两个集合的大小为 14120 14113。

于 2012-06-02T20:45:17.727 回答
1

摆脱了 StringIO 库。在这种情况下,因为每行都会调用WRITEFUNCTIONcallback handle_data,所以只需JSON直接加载。然而,有时数据中可能包含两个JSON对象。抱歉,我无法发布curl我使用的命令,因为它包含我们的凭据。但是,正如我所说,这是适用于任何流式 API 的普遍问题。


def handle_data(self, buf): 
    try:
        self.tweet = json.loads(buf)
    except Exception as json_ex:
        self.data_list = buf.split('\r\n')
        for data in self.data_list:
            self.tweet_list.append(json.loads(data))    
于 2012-06-10T15:50:49.027 回答