0

我需要能够在单独的线程中隔离某些处理。只有某些消息需要这种额外的处理。它确实涉及本机调用和远程资源;因此,我真的希望能够使用我自己的线程池/队列来严格控制这个处理。

我的初始实现使用了一个可以从我的 ChannelHandler 访问的 ThreadPoolExecutor。一旦我的处理程序识别出需要特殊处理的消息,我将我的 Worker 提交给 Executor 进行处理。请注意,我确实看到了关于扩展 ExecutionHandler 和过滤的帖子,但我认为我仍然会遇到同样的问题。

问题是在工作进程完成后,我希望消息在我的管道中定义的 ExecutionHandler/ThreadPool 中继续。因此,释放“worker”线程;再次,工作线程池在队列大小、最大线程等方面有严格的限制。

下面是我的 Worker(我的 ChannleHandler 的内部类)。在底部,我得到了管道中使用的 ExecutionHandler 并调用了 handleUpstream 方法。似乎对我有用,但想知道是否有任何我没有考虑的隐藏问题......

public class EncryptionWorker implements Runnable
{
    private ChannelHandlerContext _ctx;
    private MessageEvent _e;

    public EncryptionWorker(ChannelHandlerContext ctx, MessageEvent e)
    {
        _ctx = ctx;
        _e = e;
    }

    @Override
    public void run()
    {
        Object m = _e.getMessage();
        if ( !( m instanceof AESSLMessageCtx ) )
        {            
            LOGGER.error( "Invalid Message sent to EncryptionWorker.");
            return;
        }
        AESSLMessageCtx msgContext = (AESSLMessageCtx) m;

        LOGGER.info( "EncryptionWorker initiated, id: " + msgContext.getIdentifier());

        // TODO - Process the  Message.      
        try 
        {
            // Encryption needs to be isolated from main pipeline.
            // Only occurs for special messages; could block for
            // a period of time; do not want it affecting normal
            // traffic.
            //.......
            Thread.sleep( 2000 );
        }
        catch (Exception ex)
        {
            LOGGER.warn( "Exception processing encrypted message, id: " +  msgContext.getIdentifier(), ex );
            sendError( _ctx.getChannel(), msgContext, TxnMessageCtx.ErrorReason.SYSLEVEL_ERROR );
            return;
        }

        // Continue processing message as normal....
        // Do not want to do "_ctx.sendUpstream( _e )"
        // because it continues to use my EncryptionWorker Thread.
        // I would rather do something like the following in order to
        // get it back into service pipeline Executor Threadpool.

        // Get ExecutionHandler being used in pipeline.
        ChannelHandler ch = _ctx.getPipeline().get( "executor" );
        if (ch == null || !(ch instanceof ExecutionHandler))
        {
            LOGGER.error( "ExecutionHandler not found.");
            return;                
        }
        ExecutionHandler execHandler = (ExecutionHandler) ch;

        try
        {
            // Use ExecutionHandler Executor to continue processing
            execHandler.handleUpstream( _ctx, _e );
        }
        catch (Exception ex)
        {
            LOGGER.error( "Exception initiating upstream handling after encryption processing, tpdu: " + txnContext.getIdentifier(), ex );
            sendError( _ctx.getChannel(), msgContext, TxnMessageCtx.ErrorReason.SYSLEVEL_ERROR );
            return;                
        }

        return;
    }
}
4

0 回答 0