0

这是我的代码,关键是tweets在多处理中共享变量:

import urllib,urllib2,json,re,datetime,sys,cookielib
from .. import models
from pyquery import PyQuery
from multiprocessing import Process, Pool, Queue, Manager
import os, time, random

def crawl_and_write(q,tweets):
    tweets = tweets
    for tweetHTML in tweets:

        tweetPQ = PyQuery(tweetHTML)
        tweet = models.Tweet()

        usernameTweet = tweetPQ("span:first.username.u-dir b").text();
        txt = re.sub(r"\s+", " ", tweetPQ("p.js-tweet-text").text().replace('# ', '#').replace('@ ', '@'));
        retweets = int(tweetPQ("span.ProfileTweet-action--retweet span.ProfileTweet-actionCount").attr(
            "data-tweet-stat-count").replace(",", ""));
        favorites = int(tweetPQ("span.ProfileTweet-action--favorite span.ProfileTweet-actionCount").attr(
            "data-tweet-stat-count").replace(",", ""));
        dateSec = int(tweetPQ("small.time span.js-short-timestamp").attr("data-time"));
        id = tweetPQ.attr("data-tweet-id");
        permalink = tweetPQ.attr("data-permalink-path");

        geo = ''
        geoSpan = tweetPQ('span.Tweet-geo')
        if len(geoSpan) > 0:
            geo = geoSpan.attr('title')

        tweet.id = id
        tweet.permalink = 'https://twitter.com' + permalink
        tweet.username = usernameTweet
        tweet.text = txt
        tweet.date = datetime.datetime.fromtimestamp(dateSec)
        tweet.retweets = retweets
        tweet.favorites = favorites
        tweet.mentions = " ".join(re.compile('(@\\w*)').findall(tweet.text))
        tweet.hashtags = " ".join(re.compile('(#\\w*)').findall(tweet.text))
        tweet.geo = geo

        # add tweet into queue
        q.put(tweet)

def read_result(q):
    while True:
        # get value from the queue
        tweet = q.get(True)
        results.append(tweet)
        resultsAux.append(tweet)

        if receiveBuffer and len(resultsAux) >= bufferLength:
            receiveBuffer(resultsAux)
            length += len(resultsAux)
            # tell the finish percerntage of the process
            percent = length / float(tweetCriteria.maxTweets) * 100
            resultsAux = []
            print '%.2f %% of tweets required was finished, we have %s tweets now' % (percent, length)

        if tweetCriteria.maxTweets > 0 and len(results) >= tweetCriteria.maxTweets:
            active = False

def getTweets(tweetCriteria, receiveBuffer=None, bufferLength=100, proxy=None):
    refreshCursor = ''

    length = 0
    results = []
    resultsAux = []
    cookieJar = cookielib.CookieJar()

    if hasattr(tweetCriteria, 'username') and (tweetCriteria.username.startswith("\'") or tweetCriteria.username.startswith("\"")) and (tweetCriteria.username.endswith("\'") or tweetCriteria.username.endswith("\"")):
        tweetCriteria.username = tweetCriteria.username[1:-1]

    active = True
    manager = Manager()


    while active:
        json = TweetManager.getJsonReponse(tweetCriteria, refreshCursor, cookieJar, proxy)
        if len(json['items_html'].strip()) == 0:
            break

        refreshCursor = json['min_position']            

        tweets = PyQuery(json['items_html'])('div.js-stream-tweet')
        tweets = manager(tweets)

        if len(tweets) == 0:
            break

        pw = Process(target = crawl_and_write, args=(q,tweets))
        pr = Process(target = read_result, args=(q,))
        # start pw
        pw.start()
        # start pr
        pr.start()
        # wait for the pw end
        pw.join()

但是,错误在代码中显示tweets = manager(tweets)Pickle.PicklingError:Can't pickle<class 'pyquery.pyquery.NoDefault'>:attribute lookup pyqery.pyquery.NoDefault failed.

我猜这是因为 pyquery 类型,但仍然不知道如何修复它?

任何想法都会有所帮助。

4

1 回答 1

0

Manager 使用 pickle 将对象序列化为二进制流,然后通过网络进行转换。所以不是每个对象都可以序列化,可能你无法解决这个问题。

于 2017-08-30T04:30:26.527 回答