1

在 Apache PLC4X 项目 ( https://plc4x.apache.org ) 中,我们正在使用 Netty 实现工业 PLC 的驱动程序。这里通常有多种协议是分层的。有时一层需要我们将一条消息拆分为底层的多条消息。现在我们面临一个大问题:一个协议协商每个连接的最大数量的未确认消息。所以我们不能发送超过这个最大值的消息,否则接收者只会发送一个错误响应。

现在我们不需要在 encode 方法中将东西添加到“out”中,而是将它们添加到某种队列中并让一些 Netty 机制负责排空该队列...... Netty 中有这样的机制吗?如果没有,实现这一点的最佳方法是什么?

如果具有良好 Netty 洞察力的人可以加入我们的项目邮件列表 (dev@plc4x.apache.org) 也会很酷,因为我们还在为 Netty 开发一些非常酷的附加功能(以太网帧上的原始套接字传输和 IP 数据包上的一个base) ...我敢打赌,这两个项目都可以相互受益。

4

1 回答 1

3

虽然 Netty 没有提供这种开箱即用的处理程序,但由于内部设计,开箱即用地制作这样的最大并发挂起请求确实很容易。

可以使用PendingWriteQueueNetty 框架中的类结合通用处理程序来制作这样的处理程序:

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue;

public class MaxPendingRequestHandler extends ChannelHandlerAdapter {

    private PendingWriteQueue queue;
    private int freeSlots;

    public MaxPendingRequestHandler(int maxRequests) {
        this.freeSlots = maxRequests;
    }

    private synchronized void trySendMessages(ChannelHandlerContext ctx) {
        if(this.freeSlots > 0) {
            while(this.freeSlots > 0) {
                if(this.queue.removeAndWrite() == null) {
                    ctx.flush();
                    return;
                }
                this.freeSlots--;
            }
            ctx.flush();
        }
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.queue = new PendingWriteQueue(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // Send everything so we get a proper failurefor those pending writes
        this.queue.removeAndWriteAll();
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        this.queue.removeAndWriteAll();
        super.channelUnregistered(ctx);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        this.queue.add(msg, promise);
        trySendMessages(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        synchronized(this) {
            this.freeSlots++;
            trySendMessages(ctx);
        }
        super.channelRead(ctx, msg);
    }

}

该处理程序的工作原理是将每条新消息保存在队列中,并在每次写入/读取时检查线路上的空闲槽。

请注意,应将处理程序放置在数据包解码器/编码器之后的管道中,否则将传入数据包计数为潜在的多个数据包时会出现问题,例如:

pipeline.addLast(new PacketCodex()); // A codex exists of an encoder and decoder, you can also ass them seperately
// pipeline.addLast(new TrafficShapingHandler()) // Optional, depending on your required protocols
// pipeline.addLast(new IdleStateHandler())      // Optional, depending on your required protocols
pipeline.addLast(new MaxPendingRequestHandler())
pipeline.addLast(new Businesshandler())

当然,您还想验证我们的处理程序是否有效,这可以使用包含EmbeddedChannel& JUnit 的单元测试来完成:

public class MaxPendingRequestHandlerTest {

    @Test
    public void testMaxPending() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

    @Test
    public void testMaxPendingWhenAResponseHasReceived() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        channel.writeInbound("RE: 1");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), "4");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

    @Test
    public void testMaxPendingWhenAllResponseHasReceived() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        channel.writeInbound("RE: 1");
        channel.writeInbound("RE: 2");
        channel.writeInbound("RE: 3");
        channel.writeInbound("RE: 4");
        channel.writeInbound("RE: 5");
        channel.writeInbound("RE: 6");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), "4");
        Assert.assertEquals(channel.readOutbound(), "5");
        Assert.assertEquals(channel.readOutbound(), "6");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

    @Test
    public void testMaxPendingWhenAllResponseHasReceivedAndNewMessagesAreSend() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        channel.writeInbound("RE: 1");
        channel.writeInbound("RE: 2");
        channel.writeInbound("RE: 3");
        channel.writeInbound("RE: 4");
        channel.writeInbound("RE: 5");
        channel.writeInbound("RE: 6");

        channel.write("7");
        channel.write("8");
        channel.write("9");
        channel.write("10");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), "4");
        Assert.assertEquals(channel.readOutbound(), "5");
        Assert.assertEquals(channel.readOutbound(), "6");
        Assert.assertEquals(channel.readOutbound(), "7");
        Assert.assertEquals(channel.readOutbound(), "8");
        Assert.assertEquals(channel.readOutbound(), "9");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

}
于 2018-04-13T18:21:31.197 回答