3

我正在尝试执行以下操作,在将创建队列的单独线程中启动流侦听器,然后稍后处理这些队列......但是Storm在线程之后不做任何事情。它卡在那里。

我的代码如下所示:

import os, sys, traceback, random, StringIO, time
import random
from uuid import uuid4
from select import select
from subprocess import Popen,PIPE
import pyinotify
import simplejson, pycurl
import sys, signal
import twitter
import tweepy
import Queue
import threading
try:
    import simplejson as json
except ImportError:
    import json

import storm
queue = Queue.Queue()

class MyModelParser(tweepy.parsers.ModelParser):
    def parse(self, method, payload):
        result = super(MyModelParser, self).parse(method, payload)
        result._payload = json.loads(payload)
        return result

class CustomStreamListener(tweepy.StreamListener):
    ''' Handles data received from the stream. '''
    def __init__(self, api, q):
        self.api = api
        self.queue = q
        self.queue.put('lalala')

    def on_status(self, status):
        self.queue.put('%s' % status.author.screen_name)
        self.queue.task_done()

    def on_error(self, status_code):
        return True # To continue listening

    def on_timeout(self):
        return True # To continue listening

class Starter():
    def __init__(self,q):
        self.queue = q
        hashtag = ['justinbieber','snooki','daddy_yankee','MikeTyson','iamdiddy','lala']
        auth = self.t_auth()
        api = tweepy.API(auth, parser=MyModelParser())
        stream = tweepy.streaming.Stream(auth,CustomStreamListener(api,queue))
        stream.filter(follow=None, track=hashtag)

    def t_auth(self):
        consumer_key=""
        consumer_secret=""
        access_key = ""
        access_secret = ""

        auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
        auth.set_access_token(access_key, access_secret) 

        return auth 

class TwitterSpout(storm.Spout):
    SPOUT_NAME = "TwitterSpout"
    queue = queue

    def initialize(self, conf, context):
        self.pid = os.getpid()      
        try:
            t = threading.Thread(target=Starter(self.queue) )
            t.daemon=True
            t.start()           

        except KeyboardInterrupt, e:
            self.log('\n\nStopping')
            raise 
4

1 回答 1

0

使用 pyleus( https://github.com/Yelp/pyleus ) 并且你的 spout 实现应该有 next_tuple(self): 它应该发出输出字段,如下例所示;

from pyleus.storm import Spout


class DummySpout(Spout):

    OUTPUT_FIELDS = ['sentence', 'name']

    def initialize(self):
        pass

    def next_tuple(self):
        self.emit(("This is a sentence.", "spout",))


if __name__ == '__main__':
    DummySpout().run()

然后写下你的螺栓;

from pyleus.storm import SimpleBolt


class DummyBolt(SimpleBolt):

OUTPUT_FIELDS = ['sentence']

def process_tuple(self, tup):
    sentence, name = tup.values
    new_sentence = "{0} says, \"{1}\"".format(name, sentence)
    self.emit((new_sentence,), anchors=[tup])


if __name__ == '__main__':
    DummyBolt().run()

你也可以看看我是如何使用它的; https://github.com/Yelp/pyleus/issues/140

于 2015-08-15T10:33:49.127 回答