我使用netty将文件数据(master)发送给其他两个slave(所有linux操作系统),发送文件如下: master发送8个文件(大小:5~200m左右)使用Netty NioServerSocketChannel:
if (file.exists() && file.isFile()) {
try {
int count = 0;
int cacheLen = 40960;
if (file.length() % cacheLen == 0) {
count = (int) (file.length() / cacheLen);
} else {
count = (int) (file.length() / cacheLen) + 1;
}
FileInputStream in = new FileInputStream(file);
byte[] buffer = new byte[cacheLen];
int readed;
int index = 0;
while ((readed = in.read(buffer)) != -1) {
byte[] tmp = new byte[readed];
System.arraycopy(buffer,0,tmp,0,readed);
FilePack filePack = new FilePack(file.getName(), index, count, tmp);
sendMessage(channel, filePack);
index++;
}
log.info("master to:{},file:{},total :{},sended: {}" ,channel.remoteAddress().toString(),file.getName(),count,index);
} catch (Throwable e) {
log.error("send file error", e);
sendErrInfo(channel,file);
}
}
发送方法如下:
public SyncGetFuture send(Channel channel, Object message, Long sessionId) throws NetException {
if(!channel.isActive()) {
this.getChannelHub().removeChannelIfDisconnected(channel);
throw new NetException("Failed to send message " + message + " to " + channel.remoteAddress() + ", cause: channel is not active.");
} else if(!channel.isWritable()) {
throw new NetException("Failed to send message " + message + " to " + channel.remoteAddress() + ", cause: channel is not writable.");
} else {
//new an packet
Packet packet = PacketCreator.create(channel.getVersion(), PacketType.BUSINESS, sessionId, message);
try {
nettyChannel.writeAndFlush(packet);
return sessionId != null?this.getSyncFutureManager().newFuture(sessionId):null;
} catch (Throwable var7) {
throw new NetException("Failed to send message " + message + " to " + channel.remoteAddress() + ", cause: " + var7.getMessage(), var7);
}
}
}
网络服务器:
this.bossGroup = new NioEventLoopGroup(this.configuration.getBossThreads());
this.workerGroup = new NioEventLoopGroup(this.configuration.getWorkerThreads());
final NettyServerHandler4 nettyServerHandler = new NettyServerHandler4(this.configuration, this, (ChannelHub4)this.getChannelHub());
ServerBootstrap b = new ServerBootstrap();
((ServerBootstrap)((ServerBootstrap)b.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class)).handler(new LoggingHandler(LogLevel.INFO))).childHandler(new ChannelInitializer() {
public void initChannel(SocketChannel ch) throws IOException {
SocketConfig config = SocketServer4.this.configuration.getSocketConfig();
if(config.getWriteBufferHighWaterMark() != null) {
ch.config().setOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, Integer.valueOf(config.getWriteBufferHighWaterMark().intValue()));
}
if(config.getWriteBufferLowWaterMark() != null) {
ch.config().setOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, Integer.valueOf(config.getWriteBufferLowWaterMark().intValue()));
}
ch.pipeline().addLast("messageDecoder", new PacketDecoder(1048576));
ch.pipeline().addLast("messageEncoder", new PacketEncoder());
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, SocketServer4.this.getConfiguration().getAllIdleTimeSeconds() * 2));
ch.pipeline().addLast("nettyServerHandler", nettyServerHandler);
}
});
网络客户端:
final NettyClientHandler4 stop = new NettyClientHandler4(this);
Bootstrap i$ = new Bootstrap();
((Bootstrap)((Bootstrap)((Bootstrap)i$.group(this.group)).channel(NioSocketChannel.class)).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(true))).handler(new ChannelInitializer() {
public void initChannel(SocketChannel ch) throws Exception {
SocketConfig config = SocketClient4.this.getConfiguration().getSocketConfig();
if(config.getWriteBufferHighWaterMark() != null) {
ch.config().setOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, Integer.valueOf(config.getWriteBufferHighWaterMark().intValue()));
}
if(config.getWriteBufferLowWaterMark() != null) {
ch.config().setOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, Integer.valueOf(config.getWriteBufferLowWaterMark().intValue()));
}
ch.pipeline().addLast("messageDecoder", new PacketDecoder(2147483647));
ch.pipeline().addLast("messageEncoder", new PacketEncoder());
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, SocketClient4.this.getConfiguration().getAllIdleTimeSeconds()));
ch.pipeline().addLast("heartBeatHandler", new HeartBeatReqHandler());
ch.pipeline().addLast("nettyClientHandler", stop);
}
});
当主机同时发送所有文件时,最后两个文件或三个文件总是在从机丢失数据;但是下一次slave专门去获取这些文件时可以下载成功;
日志显示数据丢失,file7和file8突然得到最后一个pack:
16:**:**.*** logback [nioEventLoopGroup-2-1] INFO c.j.c.f.c.l.c.PacketReceiveListener - get file packets,fileName:file7,total:6643,cout:0
.......(downloading 1,2,3,4...5487 correctly)
16:31:18.080 logback [nioEventLoopGroup-2-1] INFO c.j.c.f.c.l.c.PacketReceiveListener - get file packets,fileName:file7,total:6643,cout:5488
16:31:18.097 logback [nioEventLoopGroup-2-1] INFO c.j.c.f.c.l.c.PacketReceiveListener - get file packets,fileName:file7,total:6643,cout:6642
16:31:18.100 logback [nioEventLoopGroup-2-1] INFO c.j.c.f.c.l.c.PacketReceiveListener - get file packets,fileName:file8,total:98,cout:97
无论 master 还是 slave ,日志都没有抛出任何 net 异常;
当我在文件发送的时候睡觉时,它工作正常:
while ((readed = in.read(buffer)) != -1) {
byte[] tmp = new byte[readed];
System.arraycopy(buffer,0,tmp,0,readed);
FilePack filePack = new FilePack(file.getName(), index, count, tmp);
sendMessage(channel, filePack);
Thread.sleep(10);
index++;
}