我不得不在一个项目中使用,经过一番搜索,我在 github 上rtp/rtcp
遇到了https://github.com/jonbo372/efflux java 项目。这个项目没有一个完整的工作示例,所以在尝试了很多之后,我制作了一些测试文件并让它们运行。在我的设置中,有一个 python 服务器,它打开两个端口,一个用于rtp
,另一个用于rtcp
,并接受其上的数据和一个使用 efflux 库通过rtp/rtcp
procol 发送数据的客户端。
服务器代码:
import socket, simpleaudio as sa
import threading, queue
from multiplex import *
from threading import Thread
import time
class ServerUDP:
existing = []
joined = []
def __init__(self):
while 1:
try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.s.bind(('127.0.0.1', 0))
self.clients = set()
self.recvPackets = queue.Queue()
break
except:
print("Couldn't bind to that RTP port")
while 1:
try:
self.s2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.s2.bind(('127.0.0.1', 0))
self.clients2 = set()
self.recvPackets2 = queue.Queue()
break
except:
print("Couldn't bind to that RTP port")
def get_ports(self):
return self.s.getsockname()
def get_ports2(self):
return self.s2.getsockname()
def RecvData(self, name, delay, run_event):
while run_event.is_set():
data,addr = self.s.recvfrom(1600)
print(addr, "read", data)
y = list(addr)
y[1] = 10000 #Android clients use port 5000 to get the data
addr = tuple(y)
self.clients.add(addr)
self.recvPackets.put((data,addr)) #add the data and sender's data to the queue
def RecvData2(self, name, delay, run_event):
while run_event.is_set():
data,addr = self.s2.recvfrom(1600)
print(addr, "read", data)
y2 = list(addr)
y2[1] = 20001 #Android clients use port 5000 to get the RTCP packets
addr = tuple(y2)
self.clients2.add(addr)
self.recvPackets2.put((data,addr)) #add the data and sender's data to the queue
def close(self):
self.s.close()
def close2(self):
self.s2.close()
if __name__ == "__main__":
roomserver = ServerUDP()
run_event = threading.Event()
run_event.set()
run_event2 = threading.Event()
run_event2.set()
d1 = 1
d2 = 1
t = Thread(target= roomserver.RecvData, args = ("bob",d1,run_event))
t2 = Thread(target= roomserver.RecvData2, args= ("allen",d2,run_event2))
# t.daemon = True
t.start()
t2.start() #Start a new server for this room
port = roomserver.get_ports()[1]
port2 = roomserver.get_ports2()[1]
# self.rooms[data[1]] = [t, data[2], port]
# port_byte = port.to_bytes(4, 'big')
# send(self.request, port_byte, encode= False) #Send the port number. An ID should also be sent for every client
print("port is", port)
print("port2 is", port2)
# print("next iteration")
try:
while 1:
time.sleep(.1)
except KeyboardInterrupt:
print("attempting to close threads. Max wait =",d1)
run_event.clear()
run_event2.clear()
t.join()
t2.join()
print("threads successfully closed")
服务器将打印udp
与客户端通信所需的两个端口。
客户端代码:
import com.biasedbit.efflux.packet.AppDataPacket;
import com.biasedbit.efflux.packet.CompoundControlPacket;
import com.biasedbit.efflux.packet.ControlPacket;
import com.biasedbit.efflux.packet.DataPacket;
import com.biasedbit.efflux.participant.RtpParticipant;
import com.biasedbit.efflux.participant.RtpParticipantInfo;
import com.biasedbit.efflux.session.*;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.util.HashedWheelTimer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.UUID;
import com.biasedbit.efflux.participant.SsrcGenerator;
public class test {
private static final byte N = 1;
private static MultiParticipantSession[] sessions;
public static void main(String[] args) {
//sessions is an array of session where each session has a local participant and a remote participant
sessions = new MultiParticipantSession[N];
final AtomicInteger[] counters = new AtomicInteger[N];
final CountDownLatch latch = new CountDownLatch(N);
for(byte i = 0; i < N; i++)
{
final int temp = i;
long ssrc = 2;
// a local participant is created with an identifier ssrc it's ip address and two ports required to send data packets and \\
// control packets to server side.
RtpParticipant participant = RtpParticipant
.createReceiver(new RtpParticipantInfo(ssrc), "127.0.0.1", 10000 + (i * 2), 20001 + (i * 2));
// above local participant is added to the session
sessions[i] = new MultiParticipantSession("session" + i, 8, participant);
sessions[i].init();
final AtomicInteger counter = new AtomicInteger();
counters[i] = counter;
//following are the two listeners for data and control packets from remote side.
sessions[i].addDataListener(new RtpSessionDataListener() {
@Override
public void dataPacketReceived(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
System.err.println(session.getId() + " received data from " + participant + ": " + packet);
if (counter.incrementAndGet() == ((N - 1) * 2)) {
latch.countDown();
}
}
});
sessions[i].addControlListener(new RtpSessionControlListener() {
@Override
public void controlPacketReceived(RtpSession session, CompoundControlPacket packet) {
System.err.println("CompoundControlPacket received by session " + temp);
}
@Override
public void appDataReceived(RtpSession session, AppDataPacket appDataPacket) {
System.err.println("CompoundControlPacket received by session " + temp);
}
});
}
// a remote participant is created with an identifier, it's ip address and the two ports to communicate\\
//here we have to put the ports values obtained from running the server code.
RtpParticipant participant = RtpParticipant
.createReceiver(new RtpParticipantInfo(22), "192.168.43.11", 63302, 51755);
System.err.println("Adding " + participant + " to session " + sessions[0].getId());
//remote participant is added as a receiver to the session.
sessions[0].addReceiver(participant);
//a sample byte array of some data.
byte[] deadbeef = {(byte) 0xde, (byte) 0xad, (byte) 0xbe, (byte) 0xef};
for (byte i = 0; i < 100; i++) {
//sending the byte array from local participant to remote participants within the session, in current case there's only one, the python server.
sessions[0].sendData(deadbeef, 0x45, false);
sessions[0].sendData(deadbeef, 0x45, false);
}
}
}
如果客户端和服务器位于同一主机上,则上述代码有效,但在不同主机上运行时无效。在后一种情况下,我无法在服务器端看到数据包。在进一步调试 Efflux 时,它使用一些数据报通道来发送/接收数据包。但是我也看不到任何错误消息。
我将两台主机/笔记本电脑都连接到同一个手机的热点。
但是,在远程服务器的情况下,如果我使用Datagram
套接字发送数据包(如下所示),那么我可以在服务器端看到数据包。
InetAddress address = null;
try {
address = InetAddress.getByName("192.168.43.11");
} catch (UnknownHostException e) {
e.printStackTrace();
}
DatagramSocket socket= null;
try {
socket = new DatagramSocket();
} catch (SocketException e) {
e.printStackTrace();
}
byte[] deadbeef = {(byte) 0xde, (byte) 0xad, (byte) 0xbe, (byte) 0xef};
while(true){
DatagramPacket pkt = new DatagramPacket(deadbeef, deadbeef.length, address, 63302);
socket.send(pkt);
System.err.println("sending data " + pkt +" " + i);
}
自从 5 天以来,我一直坚持这个问题,但仍然无法解决它。问题是我无法追踪发送数据包时发生的故障,是因为网络连接还是流出问题。我Intellij
用来运行这个流出代码。