0

我正在尝试创建一个 Python 应用程序,其中一个进程(进程'A')接收请求并将其放入 ProcessPool(来自 concurrent.futures)。在处理此请求时,可能需要将消息传递给第二个进程(进程“B”)。我正在使用 tornado 的 iostream 模块来帮助包装连接并获得响应。

进程 A 未能从 ProcessPool 执行中成功连接到进程 B。我哪里错了?

向进程 A 发出初始请求的客户端:

#!/usr/bin/env python

import socket
import tornado.iostream
import tornado.ioloop

def print_message ( data ):
    print 'client received', data

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM, 0)
stream = tornado.iostream.IOStream(s)
stream.connect(('localhost',2001))
stream.read_until('\0',print_message)
stream.write('test message\0')
tornado.ioloop.IOLoop().instance().start()

收到初始请求的进程 A:

#!/usr/bin/env python

import tornado.ioloop
import tornado.tcpserver
import tornado.iostream
import socket
import concurrent.futures
import functools

def handle_request ( data ):
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
    out_stream = tornado.iostream.IOStream(s)
    out_stream.connect(('localhost',2002))
    future = out_stream.read_until('\0')
    out_stream.write(data+'\0')
    return future.result()

class server_a (tornado.tcpserver.TCPServer):

   def return_response ( self, in_stream, future ):
       in_stream.write(future.result()+'\0')

   def handle_read ( self, in_stream, data ):
       future = self.executor.submit(handle_request,data)
       future.add_done_callback(functools.partial(self.return_response,in_stream))

   def handle_stream ( self, in_stream, address ):
       in_stream.read_until('\0',functools.partial(self.handle_read,in_stream))

   def __init__ ( self ):
       self.executor = concurrent.futures.ProcessPoolExecutor()
       tornado.tcpserver.TCPServer.__init__(self)

server = server_a()
server.bind(2001)
server.start(0)
tornado.ioloop.IOLoop().instance().start()

进程 B,它应该接收来自进程 A 的中继请求:

#!/usr/bin/env python

import tornado.ioloop
import tornado.tcpserver
import functools

class server_b (tornado.tcpserver.TCPServer):

    def handle_read ( self, in_stream, data ):
        in_stream.write('server B read'+data+'\0')

    def handle_stream ( self, in_stream, address ):
       in_stream.read_until('\0',functools.partial(self.handle_read,in_stream))

server = server_b()
server.bind(2002)
server.start(0)
tornado.ioloop.IOLoop().instance().start()

最后,进程 A 返回的错误,在“read_until”方法期间引发:

ERROR:concurrent.futures:exception calling callback for <Future at 0x10654b890 state=finished raised OSError>
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/concurrent/futures/_base.py", line 299, in _invoke_callbacks
    callback(self)
  File "./a.py", line 26, in return_response
    in_stream.write(future.result()+'\0')
  File "/usr/local/lib/python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
    return self.__get_result()
  File "/usr/local/lib/python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
    raise self._exception
OSError: [Errno 9] Bad file descriptor
4

2 回答 2

0

我不是 100% 确定你为什么会收到这个“错误的文件描述符”错误(不幸的是,concurrent.futures 在向后移植到 2.7 时丢失了回溯信息),但是 ProcessPoolExecutor 的工作进程中没有运行 IOLoop,所以你赢了'不能在这种情况下使用像 IOStream 这样的 Tornado 结构(除非您为每个任务启动一个新的 IOLoop,但这可能没有多大意义,除非您需要与其他异步库兼容)。

我也不确定以这种方式混合龙卷风的多进程模式和 ProcessPoolExecutor 是否有效。我认为您可能需要将 ProcessPoolExecutor 的初始化移动到 start(0) 调用之后。

于 2014-05-11T22:44:41.013 回答
0

好的,我已经解决了这个问题,方法是更新进程 A:

def stop_loop ( future ):
    tornado.ioloop.IOLoop.current().stop()

def handle_request ( data ):
    tornado.ioloop.IOLoop.clear_current()
    tornado.ioloop.IOLoop.clear_instance()
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
    out_stream = tornado.iostream.IOStream(s)
    out_stream.connect(('localhost',2002))
    future = out_stream.read_until('\0')
    future.add_done_callback(stop_loop)
    out_stream.write(data+'\0')
    tornado.ioloop.IOLoop.instance().start()
    return future.result()

尽管 IOLoop 之前没有在生成的进程中启动,但在调用当前实例时它正在返回其父循环。清除这些引用允许启动流程的新循环。我不正式知道这里发生了什么。

于 2014-05-12T08:33:37.313 回答