5
  • 我正在使用 Spark Streaming 2.2.0 和spark-streaming-kinesis-asl_2.11使用 Kinesis 流。
  • Kinesis Stream 有 150 个分片,我正在监控GetRecords.IteratorAgeMillisecondsCloudWatch 指标以查看消费者是否跟上流。
  • Kinesis Stream 的默认数据保留时间为 86400 秒(1 天)。
  • 我正在调试一个案例,其中一些 Kinesis Shard 达到最大值GetRecords.IteratorAgeMilliseconds86400000(== 保留期)
  • 这仅适用于某些分片(我们称它们为过时分片),而不是所有分片。

我已经为过时的 shards确定了 shardIds 。其中之一是shardId-000000000518,我可以在包含以下检查点信息的 DynamoDB 表中看到:

leaseKey: shardId-000000000518
checkpoint: 49578988488125109498392734939028905131283484648820187234
checkpointSubSequenceNumber: 0
leaseCounter: 11058
leaseOwner: 10.0.165.44:52af1b14-3ed0-4b04-90b1-94e4d178ed6e    
ownerSwitchesSinceCheckpoint: 37
parentShardId: { "shardId-000000000269" }

我可以在 10.0.165.44 的 worker 日志中看到以下内容:

17/11/22 01:04:14 INFO Worker:当前流分片分配:shardId-000000000339,...,shardId-000000000280,shardId-000000000518

...这应该意味着shardId-000000000518已分配给该工作人员。但是,我从未在此 shardId 的日志中看到任何其他内容。如果工人没有从这个 shardId 消费(但它应该),这可以解释为什么GetRecords.IteratorAgeMilliseconds永远不会减少。对于其他一些(非过时的 shardIds),我可以在日志中看到

17/11/22 01:31:28 INFO SequenceNumberValidator:已验证的序列号 49578988151227751784190049362310810844771023726275728690,分片 ID 为 shardId-00000000033

我确实通过查看 IncomingRecords CloudWatch 指标来验证过时的分片是否有数据流入其中。

我该如何调试/解决这个问题?为什么这些 shardIds 永远不会被 Spark 工作人员拾取?

4

0 回答 0