我一直在纠结使用 Netty 将字节流式传输到 ClamAV 服务的配置。我正在运行 Apache Camel 路线。
使用 Netty,我无法拦截“超出 INSTREAM 大小限制”消息。
INSTREAM 必须在此命令前加上 n 或 z。扫描数据流。在 INSTREAM 之后,流在发送命令的同一个套接字上以块的形式发送到 clamd。这避免了建立新 TCP 连接的开销和 NAT 问题。块的格式是: '' 其中是以字节为单位的以下数据的大小,以网络字节顺序表示为 4 字节无符号整数,并且是实际块。流式传输通过发送一个长度为零的块来终止。注意:不要超过 clamd.conf 中定义的 StreamMaxLength,否则 clamd 将回复超过 INSTREAM 大小限制并关闭连接。
使用直接同步套接字连接我没有问题。谁能指出我应该如何使用 Netty 来做到这一点的正确方向?或者我应该坚持使用同步套接字连接。
使用同步套接字实现。归功于https://github.com/solita/clamav-java “Antti Virtanen”。
private class UseSocket implements Processor{
@Override
public void process(Exchange exchange) throws Exception{
try (BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class));
Socket socket = new Socket("localhost", 3310);
BufferedOutputStream socketOutput = new BufferedOutputStream(socket.getOutputStream())){
byte[] command = "zINSTREAM\0".getBytes();
socketOutput.write(command);
socketOutput.flush();
byte[] chunk = new byte[2048];
int chunkSize;
try(BufferedInputStream socketInput = new BufferedInputStream(socket.getInputStream())){
for(chunkSize = message.read(chunk);chunkSize > -1;chunkSize = message.read(chunk)){
socketOutput.write(ByteBuffer.allocate(4).putInt(chunkSize).array());
socketOutput.write(chunk, 0, chunkSize);
socketOutput.flush();
if(processReply(socketInput, exchange)){
return;
}
}
socketOutput.write(ByteBuffer.allocate(4).putInt(0).array());
socketOutput.flush();
processReply(socketInput, exchange);
}
}
}
private boolean processReply(BufferedInputStream in, Exchange exchange) throws Exception{
if(in.available() > 0) {
logger.info("processing reply");
byte[] replyBytes = new byte[256];
int replySize = in.read(replyBytes);
if (replySize > 0) {
String reply = new String(replyBytes, 0, replySize, StandardCharsets.UTF_8);
String avStatus = "infected";
if ("stream: OK\0".equals(reply)) {
avStatus = "clean";
} else if ("INSTREAM size limit exceeded. ERROR\0".equals(reply)) {
avStatus = "overflow";
}
exchange.getIn().setHeader("av-status", avStatus);
return true;
}
}
return false;
}
}
使用带有入站和出站通道处理程序的 Netty 实现。
private class UseNetty implements Processor{
@Override
public void process(Exchange exchange) throws Exception{
logger.info(CLASS_NAME + ": Creating Netty client");
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.remoteAddress(new InetSocketAddress("localhost", 3310));
bootstrap.handler(new ClamAvChannelIntializer(exchange));
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture().sync();
}catch(Exception ex) {
logger.error(CLASS_NAME + ": ERROR", ex);
}
finally
{
eventLoopGroup.shutdownGracefully();
logger.info(CLASS_NAME + ": Netty client closed");
}
}
}
public class ClamAvChannelIntializer extends ChannelInitializer<SocketChannel> {
private Exchange exchange;
public ClamAvChannelIntializer(Exchange exchange){
this.exchange = exchange;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClamAvClientWriter());
socketChannel.pipeline().addLast(new ClamAvClientHandler(exchange));
}
}
public class ClamAvClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
String CLASS_NAME;
Logger logger;
private Exchange exchange;
public static final int MAX_BUFFER = 2048;
public ClamAvClientHandler(Exchange exchange){
super();
CLASS_NAME = this.getClass().getName();
logger = LoggerFactory.getLogger(CLASS_NAME);
this.exchange = exchange;
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception{
logger.info(CLASS_NAME + ": Entering channelActive");
channelHandlerContext.write(exchange);
logger.info(CLASS_NAME + ": Exiting channelActive");
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause){
cause.printStackTrace();
channelHandlerContext.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
logger.info(CLASS_NAME + ": Entering channelRead0");
String reply = byteBuf.toString(CharsetUtil.UTF_8);
logger.info(CLASS_NAME + ": Reply = " + reply);
String avStatus = "infected";
if ("stream: OK\0".equals(reply)) {
avStatus = "clean";
} else if ("INSTREAM size limit exceeded. ERROR\0".equals(reply)) {
avStatus = "overflow";
} else{
logger.warn("Infected or unknown reply = " + reply);
}
exchange.getIn().setHeader("av-status", avStatus);
logger.info(CLASS_NAME + ": Exiting channelRead0");
channelHandlerContext.close();
}
}
public class ClamAvClientWriter extends ChannelOutboundHandlerAdapter {
String CLASS_NAME;
Logger logger;
public static final int MAX_BUFFER = 64000;//2^16
public ClamAvClientWriter(){
CLASS_NAME = this.getClass().getName();
logger = LoggerFactory.getLogger(CLASS_NAME);
}
@Override
public void write(ChannelHandlerContext channelHandlerContext, Object o, ChannelPromise channelPromise) throws Exception{
logger.info(CLASS_NAME + ": Entering write");
Exchange exchange = (Exchange)o;
try(BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class))){
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("zINSTREAM\0".getBytes()));
byte[] chunk = new byte[MAX_BUFFER];
for(int i=message.read(chunk);i>-1;i=message.read(chunk)){
byte[] chunkSize = ByteBuffer.allocate(4).putInt(i).array();
channelHandlerContext.write(Unpooled.copiedBuffer(chunkSize));
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(chunk, 0, i));
}
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(ByteBuffer.allocate(4).putInt(0).array()));
}
logger.info(CLASS_NAME + ": Exiting write");
}
}