0

所以我现在开始学习python,我绝对爱上了它。

我正在构建一个小型 facebook 数据抓取工具。基本上,它将使用 Graph API 并抓取指定数量用户的名字。它在单个线程中运行良好(或者我猜没有线程)。

我使用在线教程提出了以下多线程版本(更新代码)

import requests
import json
import time
import threading
import Queue

GraphURL = 'http://graph.facebook.com/'
first_names = {} # will store first names and their counts
queue = Queue.Queue()

def getOneUser(url):
    http_response = requests.get(url) # open the request URL
    if http_response.status_code == 200:
        data = http_response.text.encode('utf-8', 'ignore') # Get the text of response, and encode it
        json_obj = json.loads(data) # load it as a json object
        # name = json_obj['name']
        return json_obj['first_name']
        # last = json_obj['last_name']
    return None

class ThreadGet(threading.Thread):
    """ Threaded name scraper """
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            #print 'thread started\n'
            url = GraphURL + str(self.queue.get())
            first = getOneUser(url) # get one user's first name
            if first is not None:
                if first_names.has_key(first): # if name has been encountered before
                    first_names[first] = first_names[first] + 1 # increment the count
                else:
                    first_names[first] = 1 # add the new name
            self.queue.task_done()
            #print 'thread ended\n'

def main():
    start = time.time()
    for i in range(6):
        t = ThreadGet(queue)
        t.setDaemon(True)
        t.start()

    for i in range(100):
        queue.put(i)

    queue.join()

    for name in first_names.keys():
        print name + ': ' + str(first_names[name])

    print '----------------------------------------------------------------'
    print '================================================================'
    # Print top first names
    for key in first_names.keys():
        if first_names[key] > 2:
            print key + ': ' + str(first_names[key])

    print 'It took ' + str(time.time()-start) + 's'

main()

老实说,我不明白代码的某些部分,但我明白了主要思想。输出什么都没有。我的意思是外壳里面什么都没有,所以我相信它会继续运行。

所以我正在做的是填充queuefb上用户ID的整数。然后每个 ID 用于构建 api 调用 URL。getOneUser一次返回一个用户的名称。该task(ID)被标记为“完成”并继续前进。

上面的代码有什么问题?

4

2 回答 2

3

您的使用first_names不是线程安全的。您可以添加一个锁来保护增量。否则代码应该可以工作。您可能会遇到一些 facebook api 限制,即,您应该限制您的请求率。

您可以通过使用线程池并计算主线程中的名称来简化代码:

#!/usr/bin/env python
import json
import urllib2
from collections import Counter
from multiprocessing.dummy import Pool # use threads

def get_name(url):
    try:
        return json.load(urllib2.urlopen(url))['first_name']
    except Exception:
        return None # error

urls = ('http://graph.facebook.com/%d' % i for i in xrange(100))
p = Pool(5) # 5 concurrent connections
first_names = Counter(p.imap_unordered(get_name, urls))
print first_names.most_common()

要查看您遇到的错误,您可以添加日志记录:

#!/usr/bin/env python
import json
import logging
import urllib2
from collections import Counter
from multiprocessing.dummy import Pool # use threads

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s %(threadName)s %(message)s")

def get_name(url):
    try:
        name = json.load(urllib2.urlopen(url))['first_name']
    except Exception as e:
        logging.debug('error: %s url: %s', e, url)
        return None # error
    else:
        logging.debug('done url: %s', url)
        return name

urls = ('http://graph.facebook.com/%d' % i for i in xrange(100))
p = Pool(5) # 5 concurrent connections
first_names = Counter(p.imap_unordered(get_name, urls))
print first_names.most_common()

限制每个给定时间段的请求数量的一种简单方法是使用信号量:

#!/usr/bin/env python
import json
import logging
import time
import urllib2
from collections import Counter
from multiprocessing.dummy import Pool # use threads
from threading import _BoundedSemaphore as BoundedSemaphore, Timer

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s %(threadName)s %(message)s")

class RatedSemaphore(BoundedSemaphore):
    """Limit to 1 request per `period / value` seconds (over long run)."""
    def __init__(self, value=1, period=1):
        BoundedSemaphore.__init__(self, value)
        t = Timer(period, self._add_token_loop,
                  kwargs=dict(time_delta=float(period) / value))
        t.daemon = True
        t.start()

    def _add_token_loop(self, time_delta):
        """Add token every time_delta seconds."""
        while True:
            try:
                BoundedSemaphore.release(self)
            except ValueError: # ignore if already max possible value
                pass
            time.sleep(time_delta) # ignore EINTR

    def release(self):
        pass # do nothing (only time-based release() is allowed)

def get_name(gid, rate_limit=RatedSemaphore(value=100, period=600)):
    url = 'http://graph.facebook.com/%d' % gid
    try:
        with rate_limit:
            name = json.load(urllib2.urlopen(url))['first_name']
    except Exception as e:
        logging.debug('error: %s url: %s', e, url)
        return None # error
    else:
        logging.debug('done url: %s', url)
        return name

p = Pool(5) # 5 concurrent connections
first_names = Counter(p.imap_unordered(get_name, xrange(200)))
print first_names.most_common()

在初始突发之后,它应该每 6 秒发出一次请求。

考虑使用批处理请求

于 2013-05-22T07:51:41.443 回答
2

您的原始run函数仅处理队列中的一项。总之,您只从队列中删除了 5 个项目。

通常run函数看起来像

run(self):
    while True:
         doUsefulWork()

即他们有一个循环导致重复的工作被完成。

[编辑] OP 编辑​​代码以包含此更改。

其他一些有用的尝试:

  • 在函数中添加一个 print 语句run:你会发现它只被调用了 5 次。
  • 删除queue.join()调用,这是导致模块阻塞的原因,然后您将能够探测队列的状态。
  • 将整个主体run放入一个函数中。验证您是否可以以单线程方式使用该函数来获得所需的结果,然后
  • 只用一个工作线程试试,然后最后去
  • 多个工作线程。
于 2013-05-21T17:45:48.397 回答