7

当批处理螺栓完成处理批处理时,仅提交每个分区的最高偏移量的正确方法是什么?我主要担心的是机器在处理批次时会死机,因为整个 shebang 将在 AWS 现场实例中运行。

我是 Storm 开发的新手,我似乎找不到 IMO 的答案是非常直接地使用 kafka 和storm。

设想:

基于保证消息处理指南,假设我有一个元组的蒸汽(kafka 主题),("word",count)处理 X 元组的批处理螺栓,进行一些聚合并创建 CSV 文件,将文件上传到 hdfs/db 和 acks。

在非 strom“天真”实现中,我会读取 X msgs(或读取 Y 秒),聚合,写入 hdfs,一旦上传完成,将每个分区的最新(最高)偏移量提交给 kafka。如果机器或进程在 db 提交之前死亡 - 下一次迭代将从前一个位置开始。

在暴风雨中,我可以创建批处理螺栓,它将锚定所有批处理元组并立即确认它们,但是我找不到将每个分区的最高偏移量提交给 kafka 的方法,因为喷口不知道批处理,所以一旦批处理螺栓确认元组,每个喷口实例都会一个接一个地确认他的元组,所以我可以按照我的看法:

  1. 在 spout 的每个 ack 上提交 acked 消息的偏移量。这将导致许多提交(每批可能是几 K 的元组),可能是乱序的,如果在提交偏移量时 spout 工作死了,我最终将部分替换一些事件。
  2. 与 1 相同。但我可以在提交的最高偏移量中添加一些本地偏移量管理(修复无序偏移量提交)并提交每隔几秒看到的 highets 偏移量(减少大量提交)但我仍然可以部分结束如果 spout 死亡,提交的偏移量
  3. 将偏移量提交逻辑移动到螺栓 - 我可以将每条消息的分区和偏移量添加到发送到批处理螺栓的数据中,并将每个分区的最高已处理偏移量作为批处理的一部分提交(发送到“偏移提交者”螺栓处批次结束)。这将解决偏移跟踪、多次提交和局部重播问题,但这会为螺栓添加特定于 kafka 的逻辑,从而将螺栓代码与 kafka 相结合,一般来说,在我看来,这似乎是在重新发明轮子。
  4. 更进一步的轮子改造和手动管理 ZK 中最高处理的 patition-offset 组合,并在我初始化 spout 时读取这个值。
4

1 回答 1

0

你的问题有很多,所以不确定这是否完全解决了它,但如果你担心发送给 kafka 的确认数量(例如,在每条消息之后),你应该能够设置消费的批量大小,例如例如 1000 可以减少很多。

于 2021-07-22T13:57:54.243 回答