0

我正在尝试使用 pymongo 来监听更改流,同时对故障/重启有弹性。因此,我将恢复令牌存储在 Redis 中以供以后重用。

这是我尝试过的代码

# Skipping pymongo and redis initialisation

if redis.exists('resumeToken'):
   resume_token = pickle.loads(redis.get('resumeToken'))
else:
   resume_token = None
stream = db.my_collection.watch(
            pipeline=[{'$match': {'fullDocument.type': 'something'}}],
            resume_after=resume_token,
            full_document='updateLookup',
            max_await_time_ms=500
        )
change = stream.next()
redis.set('resumeToken', pickle.dumps(change['_id']))
print(change)

第一次运行时,打印了一个更改,我得到了一些存储在 Redis 中的内容。

但是如果我重新运行,我收到以下错误:

pymongo.errors.OperationFailure: The resume token UUID does not exist. Has the collection been dropped?

我正在使用 MongoDB 3.6.1 和 pymongo 3.7.1-1.1。

任何人都知道为什么我的简历令牌被拒绝?

4

1 回答 1

0

首先,这不是用于恢复变更流的正确模式。请参阅https://docs.mongodb.com/ruby-driver/master/tutorials/ruby-driver-change-streams/#resuming-a-change-stream,根据需要适应 Python 或查看 pymongo 文档以获取等价物。

其次,resume token 是一个文档。如果天真地酸洗和解酸它不起作用,首先将其转换为原生 Python 数据类型(例如,序列化为扩展的 JSON 文档),然后酸洗/解酸该序列化。恢复时,您需要重新创建正确类型的实例。

第三,验证在你阅读了你写到 redis 的内容之后,你会得到你写的内容。Redis 不是通用的持久性系统。

于 2021-01-30T02:03:06.370 回答