2

我正在尝试使用 Streamparse 在 Python 中编写一个简单的 Storm 拓扑。除了我写的简单的 Kafka spout 之外,一切都对我有用——它似乎只是不断地调用“next_tuple”。我的螺栓相当慢,所以系统似乎很快在内存中爆炸。

启动拓扑时,我尝试将 topology.max.spout.pending 设置为 1,以防止它向拓扑中添加太多消息。

lein run -m streamparse.commands.run/-main topologies/.clj -t 100 --option 'topology.max.spout.pending=1' --option 'topology.workers=1' --option 'topology.acker.executors=1' 

然而,结果仍然是这样,尽管螺栓要慢得多:

24790 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24942 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24944 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24946 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25143 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25144 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25350 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
......

我简单的卡夫卡喷口:

class MetadataSpout(Spout):

    def initialize(self, stormconf, context):
        self.log('----CONFIG: %s----' % stormconf)
        k = KafkaClient(os.getenv('KAFKA'))
        self.consumer = SimpleConsumer(k, 'vacuum', 'metadata')

    def next_tuple(self):
        self.log('----NEXT TUPLE----')
        messages = self.consumer.get_messages(count=os.getenv('BATCH_COUNT', 20))
        self.emit([json.dumps([m.message.value for m in messages])])

我的螺栓只有默认配置,但需要大量时间才能完成 process() 方法。我无法弄清楚它们是如何成为问题的,但如果它们相关,我可以发布。

4

1 回答 1

2

解决了,感谢伟大的 Streamparse 团队

“topology.max.spout.pending 仅在您的 spout 可靠时才有效。您需要指定要发出的可选 tup_id 参数,以便为每个元组提供唯一的 ID。一旦您这样做,一切都会好起来的。”

为发出的元组指定 UUID 后,此问题已解决。

于 2015-03-26T22:59:38.873 回答