0

用例

使用 Apache Storm 将 Kafka 消息持久化到 S3

到目前为止的故事

  • 我尝试使用 secor(https://github.com/pinterest/secor),工作正常,达到目的。但是对于经理来说,这可能是一种维护过度的行为(正如他们所说的那样,他总是对的)
  • 我们已经有 Apache Kafka-Apache Storm 稳定集群,因此计划利用该基础设施。

议程和问题

  • 来自 Kafka 的消息将在 Storm bolt 中进行批处理,并以文件的形式写入本地磁盘

  • 在一定的时间间隔和/或大小标准之后,它将被上传到 S3

  • 为了管理故障,每个螺栓应该能够跟踪 Kafka 分区并在理想情况下每个元组偏移,因为螺栓将随机分布在整个集群中。

  • Partition / Offsets 可以持久化到 Zookeeper,但首先如何从 Tuple 中获取它们?除了将它们转发到 Kafka Spout 之外,还有其他方法吗?

4

2 回答 2

0

配置KafkaSpoutwith org.apache.storm.kafka.StringMessageAndMetadataSchemewhich 将偏移量和分区添加到 Spouts 发出的值

于 2016-10-28T19:05:51.417 回答
0

Kafka spout 已经在 zookeeper 中跟踪主题的偏移量,所以你不需要在 bolt 中实现这个逻辑。

Kafka spout 将发出元组,拓扑将跟踪它。当元组被螺栓确认时,它通过了。Spout 将考虑交付的元组。在 emmiting tuples spout 后面将跟踪 zookeeper 中的当前偏移量,因此如果出现问题,您可以开始读取消息而不是从开始。

上述拓扑将保证至少交付一次。使用三叉戟拓扑,您可以保证只交付一次。在这两种情况下,看看topology.max.spout.pending设置。正确设置它至关重要,因为您将使用批处理。

于 2016-05-04T20:21:41.457 回答