我有以下 Python 块来获取已放置到 Kinesis 流中的记录,然后将记录放入 S3 存储桶中。此流有一个分片。
# Fetching the shard iterators from the Kinesis stream
shard_iterators = []
if response and 'StreamDescription' in response:
for shard_id in response['StreamDescription']['Shards']:
shard_id = shard_id['ShardId']
shard_iterator = kinesis_connection.get_shard_iterator(stream_name, shard_id, 'LATEST')
shard_iterators.append(shard_iterator['ShardIterator'])
# Iterating over the Kinesis stream and pushing data to S3
bucket = s3_connection.get_bucket(bucket_name)
k = Key(bucket)
for shard_iterator in shard_iterators:
while 1:
response = kinesis_connection.get_records(shard_iterator)
shard_iterator = response['NextShardIterator']
if len(response['Records'])> 0:
for res in response['Records']:
k.key = datetime.datetime.now().strftime('%Y/%m/%d/') + res['SequenceNumber']
k.set_contents_from_string(res['Data'])
数据最初是从流中提取并推送到 S3,但在某些时候,我TypeError: expected string or buffer
在线接收response = kinesis_connection.get_records(shard_iterator)
。分片上还有更多记录可供我提取。以前有没有人遇到过这个问题和/或是否有人对我应该尝试解决的问题有什么想法?