我不确定它对于测试用例有多合适,但我不知道任何现有的解决方案(尽管我很确定那里有一个 - 我只是不知道在哪里)而且它看起来很有趣问题所以我写了一个小 Python 脚本来做。它仅在 Python 3 上运行,因此请确保您已经掌握了它。(在撰写本文时,Python 2 更为常见。)
#!/usr/bin/env python3
# Latest version at http://stackoverflow.com/a/16494059/200291
"""
Accept TCP connections, usually proxying to another address, but occasionally
responding with 404 Not Found.
"""
import socket
import select
DROP_DATA = b'HTTP/1.0 404 Not Found\r\n\r\nnot found\r\n'
PROXY_BUFFER_SIZE = 4096
class Reactor(object):
def __init__(self):
self.reactants = []
def add_reactant(self, reactant):
self.reactants.append(reactant)
def remove_reactant(self, reactant):
self.reactants.remove(reactant)
def iterate(self):
r, w, x = [], [], []
for reactant in self.reactants[:]:
reactant.modify(self, r, w, x)
r, w, x = select.select(r, w, x)
for reactant in self.reactants[:]:
reactant.notify(self, r, w, x)
def loop(self):
while self.reactants:
self.iterate()
class Acceptor(object):
def __init__(self, socket, action):
self.socket = socket
self.action = action
def modify(self, reactor, r, w, x):
r.append(self.socket)
def notify(self, reactor, r, w, d):
if self.socket in r:
result = self.socket.accept()
self.action(result)
class Dropper(object):
def __init__(self, socket):
self.socket = socket
self.queued = DROP_DATA
def modify(self, reactor, r, w, x):
w.append(self.socket)
def notify(self, reactor, r, w, x):
if self.socket in w:
if self.queued:
written = self.socket.send(self.queued)
self.queued = self.queued[written:]
else:
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
reactor.remove_reactant(self)
class Connector(object):
def __init__(self, addr, done):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setblocking(False)
self.socket.connect_ex(addr)
self.done = done
def modify(self, reactor, r, w, x):
w.append(self.socket)
def notify(self, reactor, r, w, x):
if self.socket in w:
self.socket.setblocking(True)
reactor.remove_reactant(self)
self.done(self.socket)
class SimplexProxy(object):
def __init__(self, source, dest, done):
self.source = source
self.dest = dest
self.buffer = b''
self.want_shutdown = False
self.done = done
def modify(self, reactor, r, w, x):
if self.buffer or self.want_shutdown:
w.append(self.dest)
else:
r.append(self.source)
def notify(self, reactor, r, w, x):
if self.source in r:
read = self.source.recv(PROXY_BUFFER_SIZE)
if not read:
self.want_shutdown = True
else:
self.buffer += read
if self.dest in w:
if self.want_shutdown:
self.dest.shutdown(socket.SHUT_WR)
reactor.remove_reactant(self)
else:
written = self.dest.send(self.buffer)
self.buffer = self.buffer[written:]
def join_sockets(reactor, socket_1, socket_2):
closed = 0
def done():
nonlocal closed
closed += 1
if closed >= 2:
socket_1.close()
socket_2.close()
reactor.add_reactant(SimplexProxy(socket_1, socket_2, done))
reactor.add_reactant(SimplexProxy(socket_2, socket_1, done))
def run_server(reactor, addr, action, backlog=5):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
sock.bind(addr)
sock.listen(backlog)
reactor.add_reactant(Acceptor(sock, action))
def proxy_connect(reactor, our_socket, their_addr):
def connected(their_socket):
join_sockets(reactor, our_socket, their_socket)
reactor.add_reactant(Connector(their_addr, connected))
def run_dropper(reactor, our_addr, their_addr, drop_freq):
count = 0
def action(accepted):
nonlocal count
count += 1
sock, addr = accepted
if count % drop_freq == 0:
reactor.add_reactant(Dropper(sock))
else:
proxy_connect(reactor, sock, their_addr)
run_server(reactor, our_addr, action)
def main():
def parse_port(s):
p = int(s)
if not (0 <= p < 65536):
raise ValueError("invalid port")
return p
def parse_natural(s):
n = int(s)
if n > 0:
return n
else:
raise ValueError("must be a natural number")
import argparse
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-H', '--local-host', metavar='HOST',
default='0.0.0.0', help="address to listen on")
parser.add_argument('-p', '--local-port', type=parse_port, metavar='PORT',
help="local port to listen on", required=True)
parser.add_argument('remote_host', help="host to connect to")
parser.add_argument('remote_port', type=parse_port,
help="port to connect to")
parser.add_argument('-D', '--drop-frequency', type=parse_natural,
metavar='FREQ', help="how many requests per drop",
required=True)
args = parser.parse_args()
def resolve(host, port):
info = socket.getaddrinfo(host, port,
socket.AF_INET, socket.SOCK_STREAM)
return info[0][-1]
local = resolve(args.local_host, args.local_port)
remote = resolve(args.remote_host, args.remote_port)
reactor = Reactor()
run_dropper(reactor, local, remote, args.drop_frequency)
try:
reactor.loop()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
要使用它代理localhost:8000
到 port 9000
,每隔 5 个连接就断开一次,像这样运行它:
% ./dropproxy.py localhost 8000 -p 9000 -D 5