在我的 Java 守护程序应用程序中,我一直在读取来自 Kafka 主题的事件,该主题具有跨多个服务器的 100 多个分区(具有高级消费者组)。所以我需要聚合每个事件名称每分钟的事件计数并将其刷新到时间序列数据库。请注意,事件时间戳可能会出现故障,并且可能落后于消费者的当前时间。活动形式如下:
Timestamp (in ms but showing in text for readability purpose ) event count
yyyy/moth/day HH:mm:ss
2015/01/01 00:03:35 E2 100
2015/01/01 00:01:35 E1 200
2015/01/01 00:00:35 E2 300
2015/01/01 00:01:27 E2 700
2015/01/01 00:00:23 E2 400
2015/01/01 00:00:30 E1 500
2015/01/01 00:00:50 E1 600
我必须在存储引擎之前进行预聚合(计数存储在任何时间序列数据库中)。
我会将以下聚合存储在存储引擎中(地板(时间戳)分钟):
2015/01/01 00:03:00 E2 100
2015/01/01 00:01:00 E1 200
2015/01/01 00:01:00 E2 700
2015/01/01 00:00:00 E1 1100
2015/01/01 00:00:00 E2 100
我已经评估了代码 hale 指标和 statsD,(石墨 collectD(不是选项)但所有这些库的问题是它们实时聚合事件这是不可能的。所以我正在考虑使用 LRUConcurrentHashMap 作为数据结构来保存计数和每分钟刷新此映射到存储。我还必须保持 LRU 结构完整 1 小时左右,因为数据计数由于滞后或落后或无序而延迟。
您是否知道任何开源库可以做到这一点或任何更好的聚合和刷新方法?