0

我正在使用水槽 JMS 源从 ActiveMQ 出列消息并将此消息转换为List<Event>使用自定义转换器

通道配置

agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000000
agent.channels.c1.transactionCapacity = 1500

当大小List<Event>小于或等于 1500(通道事务容量)时,flume 将事件写入通道,但如果事件大小大于 1500,则出现以下异常

错误日志

21 Apr 2015 12:19:28,245 WARN  [PollableSourceRunner-JMSSource-s1] (org.apache.flume.source.jms.JMSSource.doProcess:263)  - Error appending event to channel. Channel might be full. Consider increasing the channel capacity or make sure the sinks perform faster.
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: c1}
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
    at org.apache.flume.source.jms.JMSSource.doProcess(JMSSource.java:257)
    at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:54)
    at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
    at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1500 full, consider committing more frequently, increasing capacity or increasing thread count
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doPut(MemoryChannel.java:84)
    at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
    at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
    ... 4 more

如何解决这个问题呢?

注意:事件大小根据 ActiveMQ 消息动态变化

4

0 回答 0