我需要能够在单独的线程中隔离某些处理。只有某些消息需要这种额外的处理。它确实涉及本机调用和远程资源;因此,我真的希望能够使用我自己的线程池/队列来严格控制这个处理。
我的初始实现使用了一个可以从我的 ChannelHandler 访问的 ThreadPoolExecutor。一旦我的处理程序识别出需要特殊处理的消息,我将我的 Worker 提交给 Executor 进行处理。请注意,我确实看到了关于扩展 ExecutionHandler 和过滤的帖子,但我认为我仍然会遇到同样的问题。
问题是在工作进程完成后,我希望消息在我的管道中定义的 ExecutionHandler/ThreadPool 中继续。因此,释放“worker”线程;再次,工作线程池在队列大小、最大线程等方面有严格的限制。
下面是我的 Worker(我的 ChannleHandler 的内部类)。在底部,我得到了管道中使用的 ExecutionHandler 并调用了 handleUpstream 方法。似乎对我有用,但想知道是否有任何我没有考虑的隐藏问题......
public class EncryptionWorker implements Runnable
{
private ChannelHandlerContext _ctx;
private MessageEvent _e;
public EncryptionWorker(ChannelHandlerContext ctx, MessageEvent e)
{
_ctx = ctx;
_e = e;
}
@Override
public void run()
{
Object m = _e.getMessage();
if ( !( m instanceof AESSLMessageCtx ) )
{
LOGGER.error( "Invalid Message sent to EncryptionWorker.");
return;
}
AESSLMessageCtx msgContext = (AESSLMessageCtx) m;
LOGGER.info( "EncryptionWorker initiated, id: " + msgContext.getIdentifier());
// TODO - Process the Message.
try
{
// Encryption needs to be isolated from main pipeline.
// Only occurs for special messages; could block for
// a period of time; do not want it affecting normal
// traffic.
//.......
Thread.sleep( 2000 );
}
catch (Exception ex)
{
LOGGER.warn( "Exception processing encrypted message, id: " + msgContext.getIdentifier(), ex );
sendError( _ctx.getChannel(), msgContext, TxnMessageCtx.ErrorReason.SYSLEVEL_ERROR );
return;
}
// Continue processing message as normal....
// Do not want to do "_ctx.sendUpstream( _e )"
// because it continues to use my EncryptionWorker Thread.
// I would rather do something like the following in order to
// get it back into service pipeline Executor Threadpool.
// Get ExecutionHandler being used in pipeline.
ChannelHandler ch = _ctx.getPipeline().get( "executor" );
if (ch == null || !(ch instanceof ExecutionHandler))
{
LOGGER.error( "ExecutionHandler not found.");
return;
}
ExecutionHandler execHandler = (ExecutionHandler) ch;
try
{
// Use ExecutionHandler Executor to continue processing
execHandler.handleUpstream( _ctx, _e );
}
catch (Exception ex)
{
LOGGER.error( "Exception initiating upstream handling after encryption processing, tpdu: " + txnContext.getIdentifier(), ex );
sendError( _ctx.getChannel(), msgContext, TxnMessageCtx.ErrorReason.SYSLEVEL_ERROR );
return;
}
return;
}
}