这是我正在处理的异步功能。这里的思路是连接Redis,里面有key-val对,val是json对象或者list,需要进一步处理。
这些 key-val 对由上游进程生成,该进程使用 Redis 作为临时存储。输入速率约为每分钟 5000 对。
这里的过程是 while 循环连续运行(使用 cou 测试 1 次迭代),检查 dbsize 并使用异步过程处理一批键。
async def main():
try:
# Connect Redis
r = redis.StrictRedis(host='localhost', port=6379, db=1)
loop = asyncio.get_event_loop()
# Testing with cou = 1 for 1 iteration, this should be "while True:" or similar long running condition.
cou = 1
while(cou > 0):
keyNum = r.dbsize()
if(keyNum > 0):
# We have keys. Process!
for key in r.scan_iter(count=10):
taskStr = r.get(key)
tasks = json.loads(taskStr)
if isinstance(tasks, list):
for t in tasks:
await asyncio.create_task(async_process(t))
else:
await asyncio.create_task(async_process(tasks))
r.delete(key)
else:
# We should wait for a bit and check again
await asyncio.sleep(5)
cou = cou - 1
# We should close now.
logger.info("Done processing data. Closing.")
except:
logger.error("({})".format(traceback.format_exc()))
logger.error("Error in MAIN. We should exit.")
finally:
loop.stop()
现在我在测试这段代码时的期望是 scan_iter 会给我一批 10 个键,这些键将被处理,并且 cou 将递减并退出 while 循环。但它返回了所有内容,并且所有内容都立即得到处理。
我的问题:
scan_iter 是否会出现这种行为?有没有办法强制 scan_iter 只返回特定数量的键?
如果 Redis 中有数百万个键,这种行为不会导致性能问题吗?如何避免这种性能下降?
有没有更优雅的方式来处理 Redis 中传入数据的批处理?