0

我不得不在一个项目中使用,经过一番搜索,我在 github 上rtp/rtcp遇到了https://github.com/jonbo372/efflux java 项目。这个项目没有一个完整的工作示例,所以在尝试了很多之后,我制作了一些测试文件并让它们运行。在我的设置中,有一个 python 服务器,它打开两个端口,一个用于rtp,另一个用于rtcp,并接受其上的数据和一个使用 efflux 库通过rtp/rtcpprocol 发送数据的客户端。

服务器代码:

import socket, simpleaudio as sa
import threading, queue
from multiplex import *
from threading import Thread
import time
class ServerUDP:
    existing = []
    joined = []
    def __init__(self):
        while 1:
            try:
                self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                self.s.bind(('127.0.0.1', 0))
                self.clients = set()
                self.recvPackets = queue.Queue()
                break
            except:
                print("Couldn't bind to that RTP port")

    while 1:
        try:
            self.s2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.s2.bind(('127.0.0.1', 0))
            self.clients2 = set()
            self.recvPackets2 = queue.Queue()
            break
        except:
            print("Couldn't bind to that RTP port")

def get_ports(self):
    return self.s.getsockname()

def get_ports2(self):
    return self.s2.getsockname()

def RecvData(self, name, delay, run_event):
    while run_event.is_set():
        data,addr = self.s.recvfrom(1600)
        print(addr, "read", data)
        y = list(addr)
        y[1] = 10000 #Android clients use port 5000 to get the data
        addr = tuple(y)
        self.clients.add(addr)
        self.recvPackets.put((data,addr)) #add the data and sender's data to the queue

def RecvData2(self, name, delay, run_event):
    while run_event.is_set():
        data,addr = self.s2.recvfrom(1600)
        print(addr, "read", data)
        y2 = list(addr)
        y2[1] = 20001 #Android clients use port 5000 to get the RTCP packets
        addr = tuple(y2)
        self.clients2.add(addr)
        self.recvPackets2.put((data,addr)) #add the data and sender's data to the queue


def close(self):
    self.s.close()

def close2(self):
    self.s2.close()

if __name__ == "__main__":

    roomserver = ServerUDP()                                      
    run_event = threading.Event()
    run_event.set()
    run_event2 = threading.Event()
    run_event2.set()

    d1 = 1          
    d2 = 1
    t = Thread(target= roomserver.RecvData, args = ("bob",d1,run_event))
    t2 = Thread(target= roomserver.RecvData2, args= ("allen",d2,run_event2))
    # t.daemon = True
    t.start()     
    t2.start()                                                 #Start a new server for this room
    port = roomserver.get_ports()[1]
    port2 = roomserver.get_ports2()[1]
    # self.rooms[data[1]] = [t, data[2], port]
    # port_byte = port.to_bytes(4, 'big')
    # send(self.request, port_byte, encode= False)                   #Send the port number. An ID should also be sent for every client
    print("port is", port)
    print("port2 is", port2)  
    # print("next iteration") 
    try:
        while 1:
            time.sleep(.1)
    except KeyboardInterrupt:
        print("attempting to close threads. Max wait =",d1)
        run_event.clear()
        run_event2.clear()
        t.join()
        t2.join()
        print("threads successfully closed")        

服务器将打印udp与客户端通信所需的两个端口。

客户端代码:

import com.biasedbit.efflux.packet.AppDataPacket;
import com.biasedbit.efflux.packet.CompoundControlPacket;
import com.biasedbit.efflux.packet.ControlPacket;
import com.biasedbit.efflux.packet.DataPacket;
import com.biasedbit.efflux.participant.RtpParticipant;
import com.biasedbit.efflux.participant.RtpParticipantInfo;
import com.biasedbit.efflux.session.*;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.util.HashedWheelTimer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.UUID;
import com.biasedbit.efflux.participant.SsrcGenerator;

public class test {


    private static final byte N = 1;
    private static MultiParticipantSession[] sessions;

    public static void main(String[] args) {
        //sessions is an array of session where each session has a local participant and a remote participant
        sessions = new MultiParticipantSession[N];
        final AtomicInteger[] counters = new AtomicInteger[N];
        final CountDownLatch latch = new CountDownLatch(N);


        for(byte i = 0; i < N; i++)
        {
            final int temp = i;
            long ssrc = 2;
            // a local participant is created with an identifier ssrc  it's ip address and two ports required to send data packets and \\
            //    control packets to server side.
            RtpParticipant participant = RtpParticipant
                    .createReceiver(new RtpParticipantInfo(ssrc), "127.0.0.1", 10000 + (i * 2), 20001 + (i * 2));
           // above local participant is added to the session
            sessions[i] = new MultiParticipantSession("session" + i, 8, participant);
            sessions[i].init();
            final AtomicInteger counter = new AtomicInteger();
            counters[i] = counter;

            //following are the two listeners for data and control packets from remote side.
            sessions[i].addDataListener(new RtpSessionDataListener() {
                @Override
                public void dataPacketReceived(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
                    System.err.println(session.getId() + " received data from " + participant + ": " + packet);
                    if (counter.incrementAndGet() == ((N - 1) * 2)) {
                        latch.countDown();
                    }
                }
            });
            sessions[i].addControlListener(new RtpSessionControlListener() {
                @Override
                public void controlPacketReceived(RtpSession session, CompoundControlPacket packet) {
                    System.err.println("CompoundControlPacket received by session " + temp);
                }
                @Override
                public void appDataReceived(RtpSession session, AppDataPacket appDataPacket) {
                    System.err.println("CompoundControlPacket received by session " + temp);
                }
            });
        }

                // a remote participant is created with an identifier, it's ip address and the two ports to communicate\\
                //here we have to put the ports values obtained from running the server code.
                RtpParticipant participant = RtpParticipant
                        .createReceiver(new RtpParticipantInfo(22), "192.168.43.11", 63302, 51755);
                System.err.println("Adding " + participant + " to session " + sessions[0].getId());
                //remote participant is added as a receiver to the session.
                sessions[0].addReceiver(participant);

        //a sample byte array of some data.
        byte[] deadbeef = {(byte) 0xde, (byte) 0xad, (byte) 0xbe, (byte) 0xef};
        for (byte i = 0; i < 100; i++) {
            //sending the byte array from local participant to remote participants within the session, in current case there's only one, the python server.
            sessions[0].sendData(deadbeef, 0x45, false);
            sessions[0].sendData(deadbeef, 0x45, false);
        }

    }
}

如果客户端和服务器位于同一主机上,则上述代码有效,但在不同主机上运行时无效。在后一种情况下,我无法在服务器端看到数据包。在进一步调试 Efflux 时,它使用一些数据报通道来发送/接收数据包。但是我也看不到任何错误消息。
我将两台主机/笔记本电脑都连接到同一个手机的热点。

但是,在远程服务器的情况下,如果我使用Datagram套接字发送数据包(如下所示),那么我可以在服务器端看到数据包。

        InetAddress address = null;
        try {
            address = InetAddress.getByName("192.168.43.11");
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        DatagramSocket socket= null;
        try {
            socket = new DatagramSocket();
        } catch (SocketException e) {
            e.printStackTrace();
        }

        byte[] deadbeef = {(byte) 0xde, (byte) 0xad, (byte) 0xbe, (byte) 0xef};
        while(true){
                DatagramPacket pkt = new DatagramPacket(deadbeef, deadbeef.length, address, 63302);
                socket.send(pkt);
                System.err.println("sending data " + pkt +" " + i);
        }

自从 5 天以来,我一直坚持这个问题,但仍然无法解决它。问题是我无法追踪发送数据包时发生的故障,是因为网络连接还是流出问题。我Intellij用来运行这个流出代码。

4

0 回答 0