我正在尝试编写一个小程序来发送和接收 UDP 流量并通过 HTTP 接口接收命令。HTTP 服务器合二为一multiprocessing.Process
;UDP服务器位于另一个。这两个进程通过 python 进行通信multiprocessing.Pipe
。我在下面附上了完整的代码。
我有2个相关问题:
- 如何在 python 中使用 kqueue 处理多个文件描述符/kevents(套接字文件描述符有效,管道文件描述符似乎不 - 不确定我使用的管道是否等同于文件)?
- 如何区分这些 kevents,以便在读取管道和套接字时应用不同的功能?
我希望我的 UDP 服务器执行的伪代码:
kq = new kqueue
udpEvent = kevent when socket read
pipeEvent = kevent when pipe read
while:
for event in kq.conrol([udpEvent, pipeEvent]):
if event == udpEvent:
# do something
elif event == pipeEvent:
print "HTTP command via pipe:", pipe.recv()
现在,UDP 服务器识别套接字事件并正确读取套接字。但是,当我将管道 kevent 添加到 kqueue 时,程序会不停地吐出管道事件。我将过滤器设置为已写入管道,但我假设 1)这是错误的 2)更具体地说,pythonmultiprocessing.Pipe
就像一个常规的 unix 管道,需要以不同的方式处理。
.....
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 ^C<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
主文件
import sys
from multiprocessing import Process, Pipe
# from userinterface import OSXstatusbaritem # use like so: OSXstatusbaritem.start(pipe)
from server import Server
import handler # UI thingy
# For UI, use simple HTTP server with various endpoints
# open a connection: localhost:[PORT]/open/[TARGET_IP]
def startServer(pipe):
UDP_IP = "127.0.0.1"
UDP_PORT = 9000
print "starting server"
s = Server(pipe)
s.listen(UDP_IP, UDP_PORT)
print "finishing server"
import BaseHTTPServer
def startUI(pipe):
HTTP_PORT = 4567
server_class = BaseHTTPServer.HTTPServer
myHandler = handler.handleRequestsUsing(pipe)
httpd = server_class(('localhost', 4567), myHandler)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
def main():
# Named full duplex pipe for communicating between server process and UI
pipeUI, pipeServer = Pipe()
# Start subprocesses
pServer = Process(target=startServer, args=(pipeServer,))
pServer.start()
startUI(pipeUI)
pServer.join()
if __name__ == "__main__": sys.exit(main())
server.py (UDP)
import sys
import select # for kqueue
from socket import socket, AF_INET, SOCK_DGRAM
from multiprocessing import Process, Pipe
class Server:
def __init__(self, pipe):
self.pipe = pipe
def listen (self, ipaddress, port):
print "starting!"
# Initialize listening UDP socket
sock = socket(AF_INET, SOCK_DGRAM)
sock.bind((ipaddress, port))
# Configure kqueue
kq = select.kqueue()
# Event for UDP socket data available
kevent0 = select.kevent( sock.fileno(),
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE | select.KQ_EV_CLEAR)
# Event for message queue from other processes (ui)
kevent1 = select.kevent( self.pipe.fileno(),
filter=select.KQ_FILTER_WRITE,
flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE)
# TODO: Figure out how to handle multiple kevents on kqueue
# TODO: Need an event for TUN data
# Start kqueue
while True:
revents = kq.control([kevent0, kevent1], 1, None)
for event in revents:
print event
kq.close()
# close file descriptors (os.close(fd))
handler.py(HTTP 接口)
import BaseHTTPServer
# Simple HTTP endpoints for controlling prototype Phantom implementation.
# The following commands are supported:
# 1. Open a connection via /open/[IP]:[PORT]
# 2. ????
class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
pipe = None
def __init__(self, pipe, *args):
RequestHandler.pipe = pipe
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args)
def do_HEAD(s):
s.send_response(200)
s.send_header("Content-type", "application/json")
s.end_headers()
def do_GET(s):
s.send_response(200)
s.send_header("Content-type", "application/json")
s.end_headers()
# Open connection command
if s.path.startswith('/open/'):
addrStr = s.path[6:len(s.path)]
(address, port) = tuple(filter(None, addrStr.split(':')))
port = int(port)
print "opening address: ", address, "port:", port
RequestHandler.pipe.send(['open', address, port])
def handleRequestsUsing(logic):
return lambda *args: RequestHandler(logic, *args)
更新:
我用 select 重写了服务器监听方法。对于一个不会使用超过 3 或 4 个 fds 的慢速 Python 原型,速度无论如何都无关紧要。Kqueue 将成为另一天的主题。
def listen (self, ipaddress, port): print "starting!"
# Initialize listening non-blocking UDP socket
sock = socket(AF_INET, SOCK_DGRAM)
sock.setblocking(0)
sock.bind((ipaddress, port))
inputs = [sock, self.pipe] # stuff we read
outputs = [] # stuff we expect to write
while inputs:
readable, writable, exceptional = select.select(inputs, outputs, inputs)
for event in readable:
if event is sock:
self.handleUDPData( sock.recvfrom(1024) )
if event is self.pipe:
print "pipe event", self.pipe.recv()