I'm having trouble to understand how to pass real data to the Spout, For example:
I have this two files (they are working fine):
#! /usr/bin/env python
import os, random, sys, time
for i in xrange(50):
print("%s\t%s"%(os.getpid(), i))
sys.stdout.flush()
time.sleep(random.randint(0,5))
And
#! /usr/bin/env python
from __future__ import print_function
from select import select
from subprocess import Popen,PIPE
p = Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True, universal_newlines=True)
timeout = 0.1 # seconds
while p:
# remove finished processes from the list
if p.poll() is not None: # process ended
print(p.stdout.read(), end='') # read the rest
p.stdout.close()
processes.remove(p)
# wait until there is something to read
rlist = select([p.stdout], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
print(f.readline(), end='') #NOTE: it can block
Now imagine that I want to pass those random lines to the spout for the future processing, I was trying this: from uuid import uuid4 from select import select from subprocess import Popen,PIPE import storm
class TwitterSpout(storm.Spout):
def initialize(self, conf, context):
self.pid = os.getpid()
try:
self.p= Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True, universal_newlines=True)
except OSError, e:
self.log('%s'%e)
sys.exit(1)
and than in nextTuple():
def nextTuple(self):
timeout = 0.1 # seconds
while self.p:
# remove finished processes from the list
if self.p.poll() is not None: # process ended
self.log ("%s"%self.p.stdout.read()) # read the rest
self.p.stdout.close()
processes.remove(self.p)
# wait until there is something to read
rlist = select([self.p.stdout], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
self.log ("%s%s"%f.readline()) #NOTE: it can block
msgId = random.randint(0,500)
self.log('MSG IN SPOUT %s\n'%msgId)
storm.emit([f.readline()], id=msgId)
But this structure doesn't work, I'm always getting error "Pipi seems to be broken..."
or if I try different variations of this code I am blocking the process, and Storm never riches the NextTuple. Please help me to solve my problem, or if someone can give me some example how to do similar thing, or just some advice.
Thank you