0

我正在使用电机,但 pymongo 是我最初的选择,后来改用电机,因为它是 python 中 mongodb 的异步版本。

我的目标是同时以最少的等待时间查询具有大量调用的 mongodb。

大约有 1000 个符号,对于每个符号,我必须不时从 mongodb 查询其最新的烛台数据,以便执行某些计算。我需要查询每个符号的最新 5K 文档。所以该集合大约包含 1000 * 5000 = 5,000,000 个文档。

使用 Motor 和 asyncio,我使用以下方法异步获取文档,但是运行代码需要很长时间,我似乎不知道为什么。我在虚拟机上使用 8 核 cpu。

对这个问题有什么帮助吗?

async def getCandleList(symbol): # each symbol contains about 5K latest candles in the collection

    final_str = "{'symbol': '%s'}"%(symbol)
    resultType = 'candlestick_archive'
    dbName = 'candle_db'
    cursor = eval("db.{}.find({}).sort('timeStamp',-1)".format(dbName, final_str))
    finalList = await cursor.to_list(length=None)
    return finalList


async def taskForEachSymbol(symbol):

    while True:
        candleList = await getCandleList(symbol) 
        await generateSignal(candleList) # a function that generates certain signals in real time



def getAllTasks():
    awaitableTasks = []

    for symbol in symbolList: # symbolList contains around 1k symbols
        awaitableTasks.append(asyncio.create_task(taskForEachSymbol(symbol)))
    return awaitableTasks



async def mainTask():
    awaitableTasks = getAllTasks()
    await asyncio.gather(*awaitableTasks, return_exceptions=False)


async def main()
    mainLoop.run_until_complete(mainTask())
    print('completed! ... ')





if __name__ == '__main__':
    mainLoop=asyncio.new_event_loop()
    asyncio.set_event_loop(mainLoop)

    client = motor.motor_asyncio.AsyncIOMotorClient(io_loop=mainLoop)
    db = client.candles

    main()
4

0 回答 0