4

I'm having trouble to understand how to pass real data to the Spout, For example:

I have this two files (they are working fine):

#! /usr/bin/env python

import os, random, sys, time

for i in xrange(50):
    print("%s\t%s"%(os.getpid(), i))
    sys.stdout.flush()
    time.sleep(random.randint(0,5))

And

#! /usr/bin/env python

from __future__ import print_function
from select import select
from subprocess import Popen,PIPE

p = Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True,  universal_newlines=True) 

timeout = 0.1 # seconds
while p:
    # remove finished processes from the list 
    if p.poll() is not None: # process ended
        print(p.stdout.read(), end='') # read the rest
        p.stdout.close()
        processes.remove(p)

    # wait until there is something to read
    rlist = select([p.stdout], [],[], timeout)[0]

    # read a line from each process that has output ready
    for f in rlist:
        print(f.readline(), end='') #NOTE: it can block

Now imagine that I want to pass those random lines to the spout for the future processing, I was trying this: from uuid import uuid4 from select import select from subprocess import Popen,PIPE import storm

class TwitterSpout(storm.Spout):

    def initialize(self, conf, context):
        self.pid = os.getpid()
        try:
            self.p= Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True,  universal_newlines=True)
        except OSError, e:
            self.log('%s'%e)
            sys.exit(1)

and than in nextTuple():

def nextTuple(self):
    timeout = 0.1 # seconds
    while self.p:
        # remove finished processes from the list 
        if self.p.poll() is not None: # process ended
        self.log ("%s"%self.p.stdout.read()) # read the rest
        self.p.stdout.close()
        processes.remove(self.p)

        # wait until there is something to read
        rlist = select([self.p.stdout], [],[], timeout)[0]

        # read a line from each process that has output ready
        for f in rlist:
        self.log ("%s%s"%f.readline()) #NOTE: it can block
        msgId = random.randint(0,500)
        self.log('MSG IN SPOUT %s\n'%msgId)
        storm.emit([f.readline()], id=msgId)

But this structure doesn't work, I'm always getting error "Pipi seems to be broken..." or if I try different variations of this code I am blocking the process, and Storm never riches the NextTuple. Please help me to solve my problem, or if someone can give me some example how to do similar thing, or just some advice. Thank you

4

1 回答 1

1

可能有多个问题。

循环中没有中断while——无限循环。

你打f.readline()了两次电话。您可能打算在每个 之后只调用一次select

为避免阻塞,请使用data = os.read(f.fileno(), 1024)after select

我不知道nextTuple()在子进程退出之前阻塞是否可以接受。

如果您所做的只是从子进程中读取行,那么您不需要select

def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')):
    p = Popen(args, stdin=DEVNULL, stdout=PIPE, stderr=DEVNULL,
              bufsize=1, close_fds=True)
    for line in iter(p.stdout.readline, b''): # expect b'\n' newline
        yield line
    p.stdout.close()
    raise StopIteration(p.wait())

例子:

# ...
self.lines = iter_lines(sys.executable, '-u', 'rand_lines.py')

#...
def nextTuple(self):
    try:
        line = next(self.lines).decode('ascii', 'ignore')
    except StopIteration as e:
        self.exit_status = e.args[0]
    else:
        storm.emit([line.strip()])
于 2014-04-25T06:07:25.337 回答