0

I'm writing a test harness (TH) for a UDP app that opens up a tunnel to another instance of the UDP app. To send stuff through the tunnel, each UDP app has a named pipe associated with it in read+write mode (vs a TUN interface, which I'll use in the future). Each UDP app has several subprocess, and the TH launches the UDP apps as subprocess in different threads, as well as sending data over the pipes in subprocesses. It's a a mess, but I want to have it work on a simple case first, because I want to build tunnels over 5+ apps eventually.

2 Problems:

  1. Select() loop on first app is not getting all the messages sent by TH- just the first. Does select() not notify each time a file has new content/is ready for reading? I use pipe.readline() currently, changing to pipe.readlines() gets me most of the messages, but not all. See below for more detail.
  2. On the second app, the UDP data that is received I attempt to write to the pipe (a different one than the first). TH never gets the data.

Here's the flow of data through the tunnel from TH:

  • TH print >> fifo1, "foo"
  • fifo1
  • UDP app #1 receive fifo1, send UDP packet
  • UDP tunnel
  • UDP app #2 receive UDP packet, send fifo2
  • fifo2
  • TH fifo2.readline()

Naturally, this isn't working at all. When I send several messages from TH over fifo1 to app #1, only the first message gets through to app #1. This message is correctly transmitted over the UDP tunnel to app #2. App #2 writes to fifo2, but in TH I don't see anything in fifo2. I've included the console output when I run the test harness to illustrate this.

Sending data one way over tunnel...
rxDataFifo: reading from pipe
# Here we start sending messages (hello 0, hello 1, hello 2) via fifo1
i= 0
i= 1
i= 2
txDataFifo: finished # Now we are finished sending
handleFifoData: hello 0 # first message received on UDP app #1
UDP ('127.0.0.1', 9000):  fde04a5a1de6d473b70c184e5e981279hello 0 # Here's the message app #2 recv via UDP
...writing out to fifo! hello 0 # And app #2 write out to fifo2 (or should, at least)
# Nothing received from fifo2 on test harness!??

Problem 1: On the app #1 side, the select loop doesn't fire multiple readable events each the pipe is written. If instead of reading a single line from the pipe, I read all available lines, I get all the last two messages (but not the first).

Sending data one way over tunnel...
rxDataFifo: reading from pipe
i= 0
i= 1
i= 2
txDataFifo: finished
handleFifoData: hello 1
handleFifoData: hello 2
UDP ('127.0.0.1', 9000):  e3270c47214147ae3698aeecc13e2acehello 1
tunnel data: hello 1
...writing out to fifo! hello 1
UDP ('127.0.0.1', 9000):  e3270c47214147ae3698aeecc13e2acehello 2
tunnel data: hello 2
...writing out to fifo! hello 2

Problem 2: My theory is that TH is tries to read the pipe before it has any content. But that doesn't make sense, because I do a while loop on receiving fifo2 data. I tried some simpler stuff, didn't work.


Relevant portion of test harness:

def main():
    # Create pipes for each process we want to launch
    # NOTE: These pipes are passed to the multiprocessing.Process and subprocess.call
    # that we use to launch the UDP apps
    tmpdir = tempfile.mkdtemp()
    pipe0 = os.path.join(tmpdir, 'pipe0')
    pipe1 = os.path.join(tmpdir, 'pipe1')
    try:
        os.mkfifo(pipe0)
        os.mkfifo(pipe1)
    except OSError, e:
        print "Failed to create FIFO: %s" % e

    #...

    queue = Queue() # this where output goes
    num_msg = 10 # number of messages to send over tunnel
    txFifo = Process(target=txDataFifo, args=(pipe0, queue, num_msg))
    rxFifo = Process(target=rxDataFifo, args=(pipe1, queue))
    rxFifo.start()
    txFifo.start()

def txDataFifo(pipe_name, queue, num_msg):
    fifo = open(pipe_name, 'r+b')
    # print >> fifo, "hello over named pipe" # write stuff to fifo


    for i in range(0, 3):
        print "i=",i
        print >> fifo, "hello "+str(i)

    fifo.close()
    print "txDataFifo: finished"

def rxDataFifo(pipe_name, queue):
    print "rxDataFifo: reading from pipe"
    fifo = open(pipe_name, 'w+b')
    while True:
        data = fifo.readline()
        print 'rxDataFifo:', data
        queue.put(data)
    fifo.close()
    print "txDataFifo: reading from pipe"

The select loop of the UDP app and handlers for UDP and named pipe data:

def listen (self, ipaddress, udpport, testport, pipe_filename):
    # ...
    inputs = [sock, self.pipe] # stuff we read
    outputs = [] # stuff we expect to write

    # Set up the named pipe that we use to simulate the TUN interface
    # and use to communicate with the test harness
    fifo = None
    if pipe_filename:
        print "Opening pipe="+pipe_filename+" for IPC with test harness"
        fifo = open(pipe_filename, 'r+b')
        inputs.append(fifo)

    while inputs:
        readable, writable, exceptional = select.select(inputs, outputs, inputs)

        for event in readable:
            if fifo and event is fifo:
                # Handle data from test harness simulating TUN (via pipe)
                self.handleFifoData(sock, fifo)
            if event is sock:
                # Handle tunnel/setup request data
                self.handleUDPData(sock, fifo)
            if event is self.pipe:
                # Handle commands from the UI in the other process (IPC)
                data = self.pipe.recv()
                print "pipe event", data
                if data[0] == 'open':
                    # NOTE: For open command, data[1] and data[2] are
                    # an IP address and port, respectively
                    connId = self.generateConnId()
                    msg = connId + 'setup'
                    sock.sendto(msg, (data[1], data[2]))
                    self.mySetup[connId] = SetupInfo(data[1], data[2])
        # Handle exceptional?    

def handleFifoData (self, sock, fifo, ):
    # Send data from sockTest to sock
    data = fifo.readline().rstrip()

    print "handleFifoData:", data
    for peerId in self.tunnels:
        # TODO: perhaps FIFO message should be parsed json, so we know which client to send them to?
        peer = self.tunnels[peerId]
        msg = peerId + data
        sock.sendto(msg, (peer.address, peer.port))

def handleUDPData (self, sock, fifo): # reads a tuple 
    (data, addr) = sock.recvfrom(1024)
    print "UDP "+str(addr)+": ", data

    # ...
    elif peerId in self.tunnels: # We are functioning as a relay for this node
        # NOTE: This is where TUN interface forwarding would happen
        print "tunnel data:", data
        if fifo:
            print "...writing out to fifo!", data
            print >> fifo, data+'\n'
        return
    # ...
4

1 回答 1

1

您可能看不到任何东西,因为数据卡在 stdio 缓冲区中;尝试fifo.flush()在打印后添加。一般来说,应该警惕混合缓冲 IO 和选择。

最好从printand切换file.readline()os.write()and os.read(),它不会缓冲,并且具有更可预测的行为 wrt select()

于 2013-06-29T08:24:04.277 回答