我有一台服务器正在侦听 UDP 端口上的数据,并且我正在使用 netty。我的服务器有自己的帧解码器、执行处理程序和业务特定处理程序。我的测试客户端是用java DatagramSocket(不使用netty)制作的,只是在监听回复之前发送数据。请参阅下面的示例。
在我的第一个版本中,我没有在我的服务器中发回任何回复,并且我能够运行数千个并发客户端重复发送请求而没有任何问题。我的服务器完美地处理了数十万个请求。
然后,我必须在我的业务特定处理程序中为服务器端收到的每个请求添加回复。我只是按照netty 端提供的示例进行操作,即将我的数据写入事件缓冲区。在客户端,我重用了发送数据的同一个 udp 套接字来接收响应。该机制非常适用于一个客户端向我的服务器发送数百个连续请求,每个请求都正确回复。但是,在我的客户端关闭(而不是关闭套接字)的那一刻,我的服务器停止接受任何其他请求。我必须重新启动服务器才能再次运行该过程。当然,当运行多个并发客户端时,只有一个运行良好,只有一次。
我使用wireshark分析发送和接收的数据。所有数据都正确发送到服务器,服务器在客户端第一次运行时正确返回回复。但是在我停止客户端尝试重新运行它的那一刻,我的服务器停止处理数据。使用wireshark,我可以看到数据确实被发送到服务器,实际上是服务器本身停止处理数据。
有人可以告诉我在那个例子中我做错了什么吗?
这是我的客户端的简单版本:
public class UDPClientTester
{
public void sendAndReceiveClientTester() throws IOException
{
final InetAddress lIpAddress = InetAddress.getByName( "localhost" );
int lPort = 50000;
int lNbrRepeat = 1;
byte[] lDataToSendAsBytes = {1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0}; //new byte[20];
byte[] lReceivedData = new byte[22];
final DatagramSocket udpSocket = new DatagramSocket();
DatagramPacket lSendPacket = new DatagramPacket( lDataToSendAsBytes, lDataToSendAsBytes.length, lIpAddress, lPort );
DatagramPacket lReceivePacket = new DatagramPacket( lReceivedData, 22 );
// No metter the number of repeats, the server will accept all requests and return a reply for each one.
for ( byte k = 0; k < lNbrRepeat; k++ )
{
// Send data...
udpSocket.send( lSendPacket );
// Receive response... Block here until a response is received.
udpSocket.receive( lReceivePacket );
}
udpSocket.close();
// At the moment I close the socket, the server stop to accept anymore requests.
// I cannot run this example again until I restart the server!
}
}
这是我的服务器的一个简单版本,仅显示 main() 和处理程序(不显示解码器):
public class TesterChannelHandler extends SimpleChannelHandler
{
public void main(final String[] args) throws Exception
{
DatagramChannelFactory f = new NioDatagramChannelFactory( Executors.newCachedThreadPool() );
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
// Configure the pipeline factory.
b.setPipelineFactory(new ChannelPipelineFactory() {
ExecutionHandler executionHandler = new ExecutionHandler( new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576) )
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new OuterFrameDecoder( null ),
executionHandler,
new TesterChannelHandler() );
}
});
// Enable broadcast
b.setOption( "broadcast", "false" );
b.setOption( "receiveBufferSizePredictorFactory",
new FixedReceiveBufferSizePredictorFactory(1024) );
// Bind to the port and start the service.
b.bind( new InetSocketAddress(50000) );
}
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
{
Channel lChannel = e.getChannel();
SocketAddress lRemoteAddress = e.getRemoteAddress();
byte[] replyFrame = {0,9,8,7,6,5,4,3,2,1};
ChannelBuffer lReceivedBuffer = (ChannelBuffer) e.getMessage();
ChannelBuffer lReplyBuffer = ChannelBuffers.wrappedBuffer( replyFrame );
if ( lChannel != null && lChannel.isWritable() && lRemoteAddress != null )
{
// OK, I FOUND THE ERROR!! I DO NOT NEED TO CONNECT THE CHANNEL! I MUST USE THE REMOTE SOCKET ADDRESS ATTACHED TO THE EVENT AND PASS IT TO THE WRITE METHOD BELOW.
if ( !e.getChannel().isConnected() ) e.getChannel().connect( e.getRemoteAddress() );
// BELOW I SHOULD PASS THE REMOTE SOCKET ADDRESS ATTACHED TO THE EVENT AS THE SECOND ARGUMENT OF THE WRITE METHOD.
e.getChannel().write( lReplyBuffer );
}
}
}