10

好的,我将从一个详细的用例开始,并解释我的问题:

  1. 我使用第 3 方 Web 分析平台,该平台利用AWS Kinesis流将数据从客户端传递到最终目的地 - Kinesis 流;
  2. 网络分析平台使用 2 个流:
    1. 一个数据收集器流(单个分片流);
    2. 第二个流,用于丰富来自收集器流(单个分片流)的原始数据;最重要的是,这个流使用TRIM_HORIZON迭代器类型消耗来自第一个流的原始数据;
  3. 我使用AWS Java SDK使用流中的数据,具体使用GetShardIteratorRequest该类;
  4. 我目前正在开发提取类,所以这是同步完成的,这意味着我只在编译类时才使用数据;
  5. 该类令人惊讶地工作,尽管有些事情我无法理解,特别是关于如何从流中消耗数据以及每种迭代器类型的含义;

我的问题是我检索到的数据不一致,并且其中没有时间逻辑。

  • 当我使用AT_SEQUENCE_NUMBER并提供分片中的第一个序列号时

    .getSequenceNumberRange().getStartingSequenceNumber();

    ...作为``,我没有得到所有记录。同样,AFTER_SEQUENCE_NUMBER;

  • 当我使用LATEST时,我得到零结果;
  • 当我使用TRIM_HORIZON应该有意义的使用时,它似乎无法正常工作。它曾经为我提供数据,然后我添加了新的“事件”(记录到最终流中)并且我收到了零记录。神秘。

我的问题是:

  1. 如何安全地使用流中的数据,而不必担心丢失记录?
  2. 有没有替代品ShardIteratorRequest
  3. 如果有,我如何才能“浏览”流并查看其中的内容以进行调试参考?
  4. TRIM_HORIZON该方法缺少什么?

在此先感谢,我真的很想从 Kinesis 流中了解更多有关数据消耗的信息。

4

1 回答 1

6

我理解上面的困惑,我也有同样的问题,但我想我现在已经弄清楚了。请注意,我在没有 KCL 的情况下直接使用JSON API 。

我似乎 API 在客户开始使用流时为他们提供了 2 种基本的迭代器选择:

A) TRIM_HORIZON:用于读取延迟数分钟(甚至数小时)到 24 小时之间的过去记录。它不会返回最近放置的记录。即使最近已 PUT 记录,在此迭代器看到的最后一条记录上使用 AFTER_SEQUENCE_NUMBER 也会返回一个空数组。

B) LATEST:用于实时读取 FUTURE 记录(在 PUT 之后立即)。我被我在这个“在分片中最近的记录之后开始阅读,以便您始终阅读分片中的最新数据”中找到的唯一文档中的一句话欺骗了。你得到一个空数组,因为自从获得迭代器后没有记录被 PUT。如果您获得这种类型的迭代器,然后 PUT 一条记录,则该记录将立即可用。

最后,如果您知道最近放置的记录的序列 ID,则可以使用 AT_SEQUENCE_NUMBER 立即获取它,并且可以使用 AFTER_SEQUENCE_NUMBER 获取以后的记录,即使它们不会出现在 TRIM_HORIZON 迭代器中。

上面确实意味着,如果你想实时读取所有已知的过去记录和未来记录,你必须使用 A 和 B 的组合,并用逻辑来处理介于两者之间的记录(最近的过去)。KCL 很可能会解决这个问题。

于 2015-03-23T01:26:16.097 回答