While trying to execute some async functions in parallel, I am always getting an error, which I would like to understand.
Running asyncio on Python 3.5.1, I'm combining aiohttp through the ClientSession and aiopg (asynchronous psycopg2) calls.
The main idea is that I have a loop to read rows from one table, to loop on the rows and to execute in parallel several calls to the function row_loop which will start asynchronously a web request GET and then through another cursor write to the same DB the result for each row.
The ensure_future() and gather() seems to work properly, but only the first DB write goes through, the other result in an exception complaining about another coroutine already waiting.
Traceback (most recent call last):
File "sandbox/loop.py", line 148, in write_dict
await cur2.execute(INSERT, (tuple(adict.values()),))
File "/Users/mrigal/.virtualenvs/py35/lib/python3.5/site-packages/aiopg/cursor.py", line 103, in execute
waiter = self._conn._create_waiter('cursor.execute')
File "/Users/mrigal/.virtualenvs/py35/lib/python3.5/site-packages/aiopg/connection.py", line 211, in _create_waiter
'data' % func_name)
RuntimeError: cursor.execute() called while another coroutine is already waiting for incoming data
The issue could be in the aiopg library or might be that I am registering the loop against main and not against the function where .gather() actually happens. But I could find very little documentation about it...
Without using ensure_future() and gather(), the code is slow since each call is done one after each other. I might not have understood well the purpose of gather() and I might need a real multithreading solution, but I would like to test this intermediary step first.
Here is my actual code:
async def make_get(row, session, spec_country_code=None):
result = await session.get(URL, country=spec_country_code)
if not spec_country_code and result.country != row.get('country'):
return await make_get(row, session, spec_country_code=result.country)
return result
async def write_dict(conn, adict):
async with conn.cursor() as cur2:
await cur2.execute(INSERT_QUERY, (tuple(adict.values()),))
async def row_loop(conn, row, session):
result = await make_get(row=row, session=session)
if result.status == 'OVER_QUERY_LIMIT':
raise OverQueryLimitException()
else:
adict = build_adict(row, result)
await write_dict(conn=conn, adict= adict)
return result.status
async def write_loop(conn):
failed_count = 0
rows = []
async with aiohttp.ClientSession() as session:
async with conn.cursor(cursor_factory=DictCursor) as cur
await cur.execute((SELECT_QUERY))
async for row in cur:
# THIS WORKS, BUT I WOULD LIKE TO USE gather()
# try:
# status = await row_loop(conn=conn, row=row, session=session)
# except OverQueryLimitException:
# break
# if status != 'OK':
# failed_count += 1
rows.append(asyncio.ensure_future(
row_loop(conn=conn, row=row, session=session)))
responses = await asyncio.gather(*rows)
print(len(responses._children)) # Just a try
return cur.rownumber, failed_count
def print_result(mode, select_count, failed_count):
print("Tried to {} {} new entries".format(mode, select_count))
print("Found {} failed/skipped entries".format(failed_count))
async def insert_new(conn):
select_count, failed_count = await write_loop(conn=conn)
print_result('insert', select_count, failed_count)
async def main():
async with aiopg.create_pool('db_string') as pool:
async with pool.acquire() as conn:
await insert_new(conn=conn)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())