我编写了一个流程,通过Hortonworks提供的自定义水槽源和Flume Kafka接收器生成消息以写入Kafka 代理。
在此过程中,我注意到如果 KAFKA 代理已经在运行,然后我启动我的 Flume 代理,它会正确地将每条消息传递给 Kafka 代理,但是当我在 Flume 代理已经运行时启动 Kafka 代理时,KAFKA 代理无法接收所有消息。
当我运行Kafka 控制台消费者来检查收到的消息计数时,我注意到它从一开始就丢弃了很少的记录,从结尾丢弃了很少的记录。
我在Flume.conf中尝试了多种混合和匹配,但它仍然按预期工作。
以下是我提供给 Flume.conf 的配置参数 -
agent.channels = firehose-channel
agent.sources = stress-source
agent.sinks = kafkasink
#################################
# Benchmark Souce Configuration #
#################################
agent.sources.stress-source.type=com.kohls.flume.source.stress.BenchMarkTestScenriao
agent.sources.stress-source.size=5000
agent.sources.stress-source.maxTotalEvents=30000
agent.sources.stress-source.batchSize=200
agent.sources.stress-source.throughputThreshold=4000
agent.sources.stress-source.throughputControlSeconds=1
agent.sources.stress-source.channels=firehose-channel
#################################
# Firehose Channel Configuration #
#################################
agent.channels.firehose-channel.type = file
agent.channels.firehose-channel.checkpointDir = /data/flume/checkpoint
agent.channels.firehose-channel.dataDirs = /data/flume/data
agent.channels.firehose-channel.capacity = 10000
agent.channels.firehose-channel.transactionCapacity = 10000
agent.channels.firehose-channel.useDualCheckpoints=1
agent.channels.firehose-channel.backupCheckpointDir=/data/flume/backup
############################################
# Firehose Sink Configuration - Kafka Sink #
############################################
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.topic = backoff_test_17
agent.sinks.kafkasink.channel=firehose-channel
agent.sinks.kafkasink.brokerList = sandbox.hortonworks.com:6667
agent.sinks.kafkasink.batchsize = 200
agent.sinks.kafkasink.requiredAcks = 1
agent.sinks.kafkasink.kafka.producer.type = async
agent.sinks.kafkasink.kafka.batch.num.messages = 200
我还尝试分析水槽日志并注意到水槽指标正确显示了PUT和TAKE计数。
请让我知道是否有人有任何解决此问题的指示。
提前感谢您的帮助。