0

我有以下 Python 块来获取已放置到 Kinesis 流中的记录,然后将记录放入 S3 存储桶中。此流有一个分片。

# Fetching the shard iterators from the Kinesis stream
shard_iterators = []
if response and 'StreamDescription' in response:
    for shard_id in response['StreamDescription']['Shards']:
        shard_id = shard_id['ShardId']
        shard_iterator = kinesis_connection.get_shard_iterator(stream_name, shard_id, 'LATEST')
        shard_iterators.append(shard_iterator['ShardIterator'])

# Iterating over the Kinesis stream and pushing data to S3
bucket = s3_connection.get_bucket(bucket_name)
k = Key(bucket)

for shard_iterator in shard_iterators:
    while 1:
        response = kinesis_connection.get_records(shard_iterator)
        shard_iterator = response['NextShardIterator']
        if len(response['Records'])> 0:
            for res in response['Records']:
                k.key = datetime.datetime.now().strftime('%Y/%m/%d/') + res['SequenceNumber']
                k.set_contents_from_string(res['Data'])

数据最初是从流中提取并推送到 S3,但在某些时候,我TypeError: expected string or buffer在线接收response = kinesis_connection.get_records(shard_iterator)。分片上还有更多记录可供我提取。以前有没有人遇到过这个问题和/或是否有人对我应该尝试解决的问题有什么想法?

4

1 回答 1

0

该错误似乎是由 json 解码非字符串引发的。例如None,或整数。

import json
json.loads(None)
...
File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
   obj, end = self.raw_decode(s, idx=_w(s, 0).end())
TypeError: expected string or buffer

这可能在这里发生:https ://github.com/boto/boto/blob/develop/boto/kinesis/layer1.py#L705

这意味着返回的响应主体是None.

可能是由于您的 shard_iterator 即将到期。分片迭代器仅持续 5 分钟,因此如果您需要很长时间来处理当前批次的记录,或者以其他方式停滞,可能就是这样。参见:http ://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

另一种可能性是它是一个间歇性的身份验证/连接错误,我以前见过,它可能会产生虚假异常,这可能会导致相同的 boto 代码路径?

尽管如果您可以提供更多堆栈跟踪和您看到的任何日志消息,那将会很有帮助。

于 2014-06-18T09:45:21.887 回答