我需要处理来自 AWS Kinesis 流的数据,该流从设备收集事件。在过去 10 秒内收到的所有事件必须每秒调用处理函数。
说,我有两个设备 A 和 B 将事件写入流。我的过程的名称为 MyFunction 并采用以下参数:
- 设备ID
- 一段时间的数据数组
如果我在 10:00:00 开始处理(并且在过去 10 秒内已经积累了设备 A 和 B 的事件),那么我需要拨打两个电话:
- MyFunction(А, {设备 A 从 09:59:50 到 10:00:00 的事件})
- MyFunction(B, {设备 B 从 09:59:50 到 10:00:00 的事件})
下一秒,10:00:01
- MyFunction(А, {设备 A 从 09:59:51 到 10:00:01 的事件})
- MyFunction(B, {设备 B 从 09:59:51 到 10:00:01 的事件})
等等。
看起来累积从设备接收到的所有数据的最简单方法是将其内存存储在临时缓冲区中(当然只有最后 10 秒),所以我想先尝试一下。
我发现保存这种基于内存的缓冲区的最方便的方法是创建一个基于 Java Kinesis Client Library (KCL) 的应用程序。
我也考虑过基于 AWS Lambda 的解决方案,但看起来不可能将数据保存在内存中以供 lambda 使用。Lambda 的另一种选择是拥有 2 个函数,第一个函数必须将所有数据写入 DynamoDB,第二个函数每秒被调用以处理从数据库中获取的数据,而不是从内存中获取的数据。(所以这个选项要复杂得多)
所以我的问题是:实现这种处理的其他选择是什么?