我正在尝试使用 ChannelContext.attr() 方法将对象从一个处理程序传递到另一个处理程序:
private class TCPInitializer extends ChannelInitializer<SocketChannel>
{
@Override
public void initChannel(final SocketChannel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ReadTimeoutHandler(mIdleTimeout));
pipeline.addLast(new HostMappingHandler<SyslogHandler>(mRegisteredClients,
mChannels));
pipeline.addLast(new DelimiterBasedFrameDecoder(MAX_FRAME_SIZE,
Delimiters.lineDelimiter()));
pipeline.addLast(new Dispatcher());
}
}
HostMappingHandler 是一个将主机映射到数据的模板类:
public class HostMappingHandler<T> extends ChannelStateHandlerAdapter
{
public static final AttributeKey<HostMappedObject> HOST_MAPPING =
new AttributeKey<HostMappedObject>("HostMappingHandler.attr");
private final ChannelGroup mChannels;
private final Map<String, T> mMap;
public HostMappingHandler(Map<String, T> registrations,
ChannelGroup channelGroup)
{
mChannels = channelGroup;
mMap = registrations;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception
{
SocketAddress addr = ctx.channel().remoteAddress();
T mappedObj = null;
if (addr instanceof InetSocketAddress)
{
String host = ((InetSocketAddress) addr).getHostName().toLowerCase();
mappedObj = mMap.get(host);
}
if (mappedObj != null)
{
// Add the channel to the list so it can be easily removed if unregistered
mChannels.add(ctx.channel());
// Attach the host-mapped object
ctx.attr(HOST_MAPPING).set(new HostMappedObject<T>(mappedObj));
}
else
{
log.debug("Bad host [" + addr + "]; aborting connection request");
ctx.channel().close();
}
super.channelRegistered(ctx);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception
{
// Find the parser for this host. If the host is no longer registered,
// disconnect the client.
if (ctx.channel().remoteAddress() instanceof InetSocketAddress)
{
InetSocketAddress addr = (InetSocketAddress) ctx.channel().remoteAddress();
T handler = mMap.get(addr.getHostName().toLowerCase());
if (handler == null)
{
log.debug("Host no longer registered");
ctx.channel().close();
}
else
{
log.debug("Sanity Check: " + ctx.attr(HostMappingHandler.HOST_MAPPING).get());
}
}
super.inboundBufferUpdated(ctx);
}
// ==================================================
public static class HostMappedObject<C>
{
private final C mObj;
public HostMappedObject(final C in)
{
mObj = in;
}
public C get()
{
return mObj;
}
}
}
而调度器目前非常简单:
private static class Dispatcher extends ChannelInboundMessageHandlerAdapter<Object>
{
@Override
public void messageReceived(final ChannelHandlerContext ctx,
final Object msg)
throws Exception
{
HostMappingHandler.HostMappedObject wrapper =
ctx.attr(HostMappingHandler.HOST_MAPPING).get();
log.debug("Received: " + wrapper);
}
}
顺便说一句,初始化程序和调度程序是“服务器”对象的私有类。但是,当我运行一个注册 localhost 并尝试连接它的单元测试时,它会失败并且我的调试输出显示:
HostMappingHandler.inboundBufferUpdated: Sanity Check: test.comms.netty.HostMappingHandler$HostMappedObject@eb017e
Dispatcher.messageReceived: Received: null
因此,在通道注册和接收 HostMapper 类中的入站消息之间肯定会保留映射,并且在调试器中进一步调查表明 Dispatcher 中上下文的属性映射确实有一个条目 - 问题是该值为 NULL而不是对象。
我是否遗漏了一些明显的东西,或者只是一个 alpha 错误?
编辑:
我现在已经通过将数据附加到 Dispatcher 的上下文来解决这个问题,让 HostMappingHandler 也接受一个要附加的类。该错误似乎是在处理程序的 ChannelHandlerContext 中设置属性会导致该属性出现在另一个具有正确键但值为 NULL 的处理程序中。在不知道所需工作流的情况下,错误是根本不应该出现键,或者值不应该为空。
public HostMappingHandler(final Map<String, T> registrations,
final ChannelGroup channelGroup,
final Class<? extends ChannelHandler> channelHandler)
{
mChannels = channelGroup;
mMap = registrations;
mHandler = channelHandler;
}
//...
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception
{
//...
// Attach the host-mapped object
ctx.pipeline().context(mHandler).attr(HOST_MAPPING).set(new HostMappedObject<T>(mappedObj));
//...
}