0

我正在使用storm-kafka-client 1.1.1 和storm-core 1.1.0。

我已经调整了以下参数,但无法启用背压并降低 kafka-spout 的摄取率。

Spout每秒消耗2000条消息。

下游 Bolt 处理一条消息需要50 毫秒,即每秒处理 20 条消息。

spout 发出的元组和bolt 执行的元组之间的延迟随着时间的推移而增加。

**如何让 Spout 每秒读取 20 条消息并保持其消耗率与 Bolt 的执行率相同**

   **Topology**

   topology.max.spout.pending= **5** , 
   topology.message.timeout.secs= **600** , 
   topology.executor.send.buffer.size=**64** , 
   topology.executor.receive.buffer.size=**64** , 
   topology.transfer.buffer.size=**64**

   **KafkaSpoutConfig**

   setPollTimeoutMs(**200**) , 
   setFirstPollOffsetStrategy(latest) , 
   setMaxUncommittedOffsets(**2_000_000**) , 
   setGroupId(groupName) , 
   setProp("fetch.max.wait.ms",**1000**) , 
   setProp("max.poll.records", **100**) , 
   setMaxPartitionFectchBytes(**512**)  , 
   setProp("send.buffer.bytes", **512**) , 
   setProp("receive.buffer.bytes", **512**) , 
   setPartitionRefreshPeriodMs(30_000).setProp("enable.auto.commit", "true") , 
   setProp("session.timeout.ms", "**60000**") , 

   KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(**50**) ,
   KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(**5**) , 1 ,
   KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(**1**) ) ;

我不确定应该为TOPOLOGY_SPOUT_WAIT_STRATEGYBACKPRESSURE_DISRUPTOR_HIGH_WATERMARK设置什么值

那么上面的参数和值的什么组合可以帮助控制 spout 摄取率?

任何建议将不胜感激。

谢谢卡尼斯卡

4

1 回答 1

3

TOPOLOGY_SPOUT_WAIT_STRATEGY 仅在要求 spout 发出新元组时使用,并且它不发出任何内容(即,如果没有新消息)。它不应该对背压有任何影响。

我对当前的背压实现不太熟悉,但我很确定您需要使用 TOPOLOGY_BACKPRESSURE_ENABLE 显式启用它。

BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK 是一个比率,因此如果您将其设置为例如 0.9,它将在螺栓的输入队列已满 90% 时限制喷口。您可以在https://github.com/apache/storm/blob/1.1.x-branch/storm-core/src/jvm/org/apache/storm/Config.java和默认值中找到设置文档在https://github.com/apache/storm/blob/1.1.x-branch/conf/defaults.yaml

为了避免一次发出过多的元组,我认为您应该将 topology.max.spout.pending 设置为一些合理数量的元组(可能是几百个?)。确保您的拓扑设置为启用确认(即设置 topology.enable.message.timeouts 为 true)。否则 max spout pending 无效。

不确定为什么要更改执行程序缓冲区大小。

您还应该考虑将 Storm 和storm-kafka-client 升级到至少1.1.2。最近对storm-kafka-client进行了很多修复,如果您升级,您可能会更轻松地使用它。

我不确定您代码中的星星是什么意思?

于 2018-02-23T14:10:37.283 回答