我正在使用电机,但 pymongo 是我最初的选择,后来改用电机,因为它是 python 中 mongodb 的异步版本。
我的目标是同时以最少的等待时间查询具有大量调用的 mongodb。
大约有 1000 个符号,对于每个符号,我必须不时从 mongodb 查询其最新的烛台数据,以便执行某些计算。我需要查询每个符号的最新 5K 文档。所以该集合大约包含 1000 * 5000 = 5,000,000 个文档。
使用 Motor 和 asyncio,我使用以下方法异步获取文档,但是运行代码需要很长时间,我似乎不知道为什么。我在虚拟机上使用 8 核 cpu。
对这个问题有什么帮助吗?
async def getCandleList(symbol): # each symbol contains about 5K latest candles in the collection
final_str = "{'symbol': '%s'}"%(symbol)
resultType = 'candlestick_archive'
dbName = 'candle_db'
cursor = eval("db.{}.find({}).sort('timeStamp',-1)".format(dbName, final_str))
finalList = await cursor.to_list(length=None)
return finalList
async def taskForEachSymbol(symbol):
while True:
candleList = await getCandleList(symbol)
await generateSignal(candleList) # a function that generates certain signals in real time
def getAllTasks():
awaitableTasks = []
for symbol in symbolList: # symbolList contains around 1k symbols
awaitableTasks.append(asyncio.create_task(taskForEachSymbol(symbol)))
return awaitableTasks
async def mainTask():
awaitableTasks = getAllTasks()
await asyncio.gather(*awaitableTasks, return_exceptions=False)
async def main()
mainLoop.run_until_complete(mainTask())
print('completed! ... ')
if __name__ == '__main__':
mainLoop=asyncio.new_event_loop()
asyncio.set_event_loop(mainLoop)
client = motor.motor_asyncio.AsyncIOMotorClient(io_loop=mainLoop)
db = client.candles
main()