1

我正在尝试开发可以接收自定义 UDP 数据包的自定义 Flume 源。这是我的代码:

public class XvlrUdpSource extends AbstractSource
        implements EventDrivenSource, Configurable {

    private static final Logger LOG = LoggerFactory.getLogger(XvlrUdpSource.class);


    private int port;
    private String host;
    private Channel nettyChannel;

    private static final Logger logger = LoggerFactory.getLogger(XvlrUdpSource.class);

    private CounterGroup counterGroup = new CounterGroup();

    public class XvlrUpdHander extends SimpleChannelHandler {

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
            try {
                System.out.println("class: "+ mEvent.getMessage().getClass());
                /** ChannelBuffer holds just first 768 bytes of the whole input UDP packet*/
                ChannelBuffer channelBuffer = (ChannelBuffer)mEvent.getMessage();
                   Event xvlrPacketEvent = EventBuilder.withBody( ((ChannelBuffer)mEvent.getMessage()).array());
                System.out.println("Length is:["+xvlrPacketEvent.getBody().length+"]");
                //Event e = syslogUtils.extractEvent((ChannelBuffer)mEvent.getMessage());
                if(xvlrPacketEvent == null){
                    return;
                }
                getChannelProcessor().processEvent(xvlrPacketEvent);
                counterGroup.incrementAndGet("events.success");
            } catch (ChannelException ex) {
                counterGroup.incrementAndGet("events.dropped");
                logger.error("Error writting to channel", ex);
                return;
            }
        }
    }

    @Override
    public void start() {
        ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap
                (new OioDatagramChannelFactory(Executors.newCachedThreadPool()));
        final XvlrUpdHander handler = new XvlrUpdHander();
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(handler);
            }
        });

        if (host == null) {
            nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
        } else {
            nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
        }

        super.start();
    }

    @Override
    public void stop() {
        logger.info("Syslog UDP Source stopping...");
        logger.info("Metrics:{}", counterGroup);
        if (nettyChannel != null) {
            nettyChannel.close();
            try {
                nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                logger.warn("netty server stop interrupted", e);
            } finally {
                nettyChannel = null;
            }
        }

        super.stop();
    }

    @Override
    public void configure(Context context) {
        Configurables.ensureRequiredNonNull(
                context, "port");//SyslogSourceConfigurationConstants.CONFIG_PORT);
        port = context.getInteger("port");//SyslogSourceConfigurationConstants.CONFIG_PORT);
        host = context.getString("host");//SyslogSourceConfigurationConstants.CONFIG_HOST);
        //formaterProp = context.getSubProperties("PROP");//SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
    }

}

我在messageRecieved上进行了调试,并在 stacktrace 中看到:

/**
     * Sends a {@code "messageReceived"} event to the first
     * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
     * the specified {@link Channel} belongs.
     *
     * @param message        the received message
     * @param remoteAddress  the remote address where the received message
     *                       came from
     */
    public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) {
        channel.getPipeline().sendUpstream(
                new UpstreamMessageEvent(channel, message, remoteAddress));
    }

我的对象消息已经是 768 字节长度。

根在这里org.jboss.netty.channel.socket.oio.OioDatagramWorker

byte[] buf = new byte[predictor.nextReceiveBufferSize()];
            DatagramPacket packet = new DatagramPacket(buf, buf.length);

Predictor 将缓冲区大小设置为 768 然后:

fireMessageReceived(
                    channel,
                    channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
                    packet.getSocketAddress());

我只得到前 768 个字节。有没有机会改变预测者的行为?

4

2 回答 2

1

我找到了这个主题: 每个 UDP 数据报的 Netty 不同的管道

可以使用特殊属性“注入”具有所需行为的预测器。所以完整的解决方案是:

public class XvlrUdpSource extends AbstractSource
        implements EventDrivenSource, Configurable {

    private static final Logger LOG = LoggerFactory.getLogger(XvlrUdpSource.class);


    private int port;
    private String host;
    private Channel nettyChannel;

    private CounterGroup counterGroup = new CounterGroup();

    public class XvlrUpdHander extends SimpleChannelHandler {

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
            try {
                ChannelBuffer channelBuffer = (ChannelBuffer)mEvent.getMessage();
                int actualSizeOfUdpPacket = channelBuffer.readableBytes();
                byte[] body = Arrays.copyOf(channelBuffer.array(), actualSizeOfUdpPacket);
                Event xvlrPacketEvent = EventBuilder.withBody(body);
                LOG.debug("Event.body length is: {} ", xvlrPacketEvent.getBody().length);
                if(xvlrPacketEvent == null){
                    return;
                }
                getChannelProcessor().processEvent(xvlrPacketEvent);
                counterGroup.incrementAndGet("events.success");
            } catch (ChannelException ex) {
                counterGroup.incrementAndGet("events.dropped");
                LOG.error("Error writting to channel", ex);
                return;
            }
        }
    }


    @Override
    public void start() {
        OioDatagramChannelFactory oioDatagramChannelFactory =   new OioDatagramChannelFactory(                                                                           Executors.newCachedThreadPool());
        ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap(oioDatagramChannelFactory);
        serverBootstrap.setOption("sendBufferSize", 65536);
        serverBootstrap.setOption("receiveBufferSize", 65536);
        serverBootstrap.setOption("receiveBufferSizePredictorFactory",
                                    new AdaptiveReceiveBufferSizePredictorFactory(8192, 8192, 16384));


        final XvlrUpdHander handler = new XvlrUpdHander();

        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(handler);
            }
        });
        if (host == null) {
            nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
        } else {
            nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
        }

    }

        @Override
    public void stop() {
        LOG.info("Syslog UDP Source stopping...");
        LOG.info("Metrics:{}", counterGroup);
        if (nettyChannel != null) {
            nettyChannel.close();
            try {
                nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOG.warn("netty server stop interrupted", e);
            } finally {
                nettyChannel = null;
            }
        }

        super.stop();
    }

    @Override
    public void configure(Context context) {
        Configurables.ensureRequiredNonNull(context, "port");
        port = context.getInteger("port");
        host = context.getString("host");
   }
}
于 2012-12-13T09:54:15.087 回答
0

要么您发送 768 字节,要么接收缓冲区只有 768 字节长。它当然与回车无关,除非您的代码中有一些错误的处理。

于 2012-12-09T20:49:08.573 回答