5

我正在尝试为 flume-ng 编写一个自定义接收器。我查看了现有的接收器和文档并将其编码。但是,应该接收事件的 'process()' 方法总是以 null 结束。我在做 Event event = channel.take(); 但该事件为空。我在日志中看到该方法被重复调用,因为事件仍在通道中。

有人可以指出我正确的方向吗?

4

2 回答 2

5

这是一个流程功能的骨架......如果您无法获得回滚的事件,请将状态更改为BACKOFF。如果不是,您提交并将状态设置为READY。无论如何,您总是关闭交易。

    Status status = null;
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    transaction.begin();
    try {
        Event event = channel.take();

        if (event != null && validEvent(event.getBody()) >= 0) {
           # make some printing
        }
        transaction.commit();
        status = Status.READY;
    } catch (Throwable ex) {
        transaction.rollback();
        status = Status.BACKOFF;
        logger.error("Failed to deliver event. Exception follows.", ex);
        throw new EventDeliveryException("Failed to deliver event: " + ex);
    } finally {
        transaction.close();
    }
    return status;

我相信这会奏效:)。

于 2013-08-30T08:17:10.270 回答
4

这是设计使然。接收器运行器将使用事件轮询接收器,null以便确保接收器处于活动状态并准备好接受未来的事件。当您收到null事件时,请确保您返回Status.BACKOFF,并且接收器处理器将在重试之前稍等片刻。

于 2013-03-27T11:41:01.960 回答