1

我在 Amazon WebServices 中设置了 Kinesis 流。我还想完成以下任务:

  1. 使用单个分片 (C# Api) 将记录放入单个流中 - 成功
  2. 我还编写了示例应用程序,其中多个生产者正在处理不同的流 - 成功
  3. 我还设置了示例应用程序来执行多个工作人员将数据放入单个流中 - 成功

我还希望能够在 Reacords 中强制执行 SequenceNumberOrdering。

但真正的痛苦是使用 Kinesis C# Api 的 GetRecords 消费者操作。

我为记录创建了一个示例应用程序。问题是即使 Kinesis Stream 中没有记录,它也不会停止迭代。此外,将 SequenceNumber 保存在数据库或某个文件中并再次检索文件非常耗时 - 将 Kinesis Stream 用于 GetRecords 有什么优势?

为什么即使 Stream 中没有数据,它也会继续迭代?

我使用以下代码作为参考;

  private static void GetFilesKinesisStream()
        {
            IAmazonKinesis kinesis = AWSClientFactory.CreateAmazonKinesisClient();
            try
            {
                ListStreamsResponse listStreams = kinesis.ListStreams();
                int numBuckets = 0;
                if (listStreams.StreamNames != null &&
                    listStreams.StreamNames.Count > 0)
                {
                    numBuckets = listStreams.StreamNames.Count;
                    Console.WriteLine("You have " + numBuckets + " Amazon Kinesis Streams.");
                    Console.WriteLine(string.Join(",\n", listStreams.StreamNames.ToArray()));

                    DescribeStreamRequest describeRequest = new DescribeStreamRequest();
                    describeRequest.StreamName = "******************";

                    DescribeStreamResponse describeResponse = kinesis.DescribeStream(describeRequest);
                    List<Shard> shards = describeResponse.StreamDescription.Shards;
                    foreach (Shard s in shards)
                    {
                        Console.WriteLine("shard: " + s.ShardId);
                    }

                    string primaryShardId = shards[0].ShardId;

                    GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest();
                    iteratorRequest.StreamName = "*********************";
                    iteratorRequest.ShardId = primaryShardId;
                    iteratorRequest.ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER;
                    iteratorRequest.StartingSequenceNumber = "49544005271533118105145368110776211536226129690186743810";

                    GetShardIteratorResponse iteratorResponse = kinesis.GetShardIterator(iteratorRequest);
                    string iterator = iteratorResponse.ShardIterator;

                    Console.WriteLine("Iterator: " + iterator);
                    //Step #3 - get records in this iterator
                    GetShardRecords(kinesis, iterator);

                    Console.WriteLine("All records read.");
                    Console.ReadLine();
                }
                // sr.WriteLine("You have " + numBuckets + " Amazon S3 bucket(s).");
            }
            catch (AmazonKinesisException ex)
            {
                if (ex.ErrorCode != null && ex.ErrorCode.Equals("AuthFailure"))
                {
                    Console.WriteLine("The account you are using is not signed up for Amazon EC2.");
                    Console.WriteLine("You can sign up for Amazon EC2 at http://aws.amazon.com/ec2");
                }
                else
                {
                    Console.WriteLine("Caught Exception: " + ex.Message);
                    Console.WriteLine("Response Status Code: " + ex.StatusCode);
                    Console.WriteLine("Error Code: " + ex.ErrorCode);
                    Console.WriteLine("Error Type: " + ex.ErrorType);
                    Console.WriteLine("Request ID: " + ex.RequestId);
                }
            }
        }

        private static void GetShardRecords(IAmazonKinesis client, string iteratorId)
        {
            //create reqest
            GetRecordsRequest getRequest = new GetRecordsRequest();
            getRequest.Limit = 100;
            getRequest.ShardIterator = iteratorId;


            //call "get" operation and get everything in this shard range
            GetRecordsResponse getResponse = client.GetRecords(getRequest);
            //get reference to next iterator for this shard
            string nextIterator = getResponse.NextShardIterator;
            //retrieve records
            List<Record> records = getResponse.Records;

            //print out each record's data value
            foreach (Record r in records)
            {
                //pull out (JSON) data in this record
                string s = Encoding.UTF8.GetString(r.Data.ToArray());
                Console.WriteLine("Record: " + s);
                Console.WriteLine("Partition Key: " + r.PartitionKey);
            }

            if (null != nextIterator)
            {
                //if there's another iterator, call operation again
                GetShardRecords(client, nextIterator);
            }
        }
4

1 回答 1

1

为什么 kinesis 消费者在数据“结束”后不断迭代?

因为没有“结束”。Kinesis 有点像队列,但不完全是。把它想象成一个记录事件的移动时间窗口。您不使用记录,而是被动地检查当前在窗口中的记录(亚马逊将其硬编码为 24 小时)。因为窗口总是在移动,一旦你到达“最后一个”记录,它就会一直实时观看。新记录随时可能出现;消费者不知道没有任何生产者。

如果您想根据某些条件停止,则该条件必须包含在您的有效负载中。例如,如果您想在到达“现在”时停止,那么您的有效负载的一部分可能是时间戳,消费者检查其是否接近其当前时间。

于 2014-10-30T16:39:36.763 回答