0

这是我正在处理的异步功能。这里的思路是连接Redis,里面有key-val对,val是json对象或者list,需要进一步处理。

这些 key-val 对由上游进程生成,该进程使用 Redis 作为临时存储。输入速率约为每分钟 5000 对。

这里的过程是 while 循环连续运行(使用 cou 测试 1 次迭代),检查 dbsize 并使用异步过程处理一批键。

async def main():
    try:
        # Connect Redis
        r = redis.StrictRedis(host='localhost', port=6379, db=1)
        loop = asyncio.get_event_loop()
        
        # Testing with cou = 1 for 1 iteration, this should be "while True:" or similar long running condition.
        cou = 1
        while(cou > 0):
            keyNum = r.dbsize()
            if(keyNum > 0):
                # We have keys. Process!
                for key in r.scan_iter(count=10):
                    taskStr = r.get(key)
                    tasks = json.loads(taskStr)
                    if isinstance(tasks, list):
                        for t in tasks:            
                            await asyncio.create_task(async_process(t))                            
                    else:
                        await asyncio.create_task(async_process(tasks))                            
                    r.delete(key)
            else:
                # We should wait for a bit and check again
                await asyncio.sleep(5)
            cou = cou - 1

        # We should close now.        
        logger.info("Done processing data. Closing.")        
    except:
        logger.error("({})".format(traceback.format_exc()))
        logger.error("Error in MAIN. We should exit.")
    finally:
        loop.stop()

现在我在测试这段代码时的期望是 scan_iter 会给我一批 10 个键,这些键将被处理,并且 cou 将递减并退出 while 循环。但它返回了所有内容,并且所有内容都立即得到处理。

我的问题:

  1. scan_iter 是否会出现这种行为?有没有办法强制 scan_iter 只返回特定数量的键?

  2. 如果 Redis 中有数百万个键,这种行为不会导致性能问题吗?如何避免这种性能下降?

  3. 有没有更优雅的方式来处理 Redis 中传入数据的批处理?

4

0 回答 0