3

我有一些带有 http 接口的设备,它们经常生成带有我想要解析并保存到数据库的值的无限 http 页面。我从请求开始:

import asyncio
import asyncpg
import requests

class node_http_mtr():
    def __init__(self, ip, nsrc, ndst):
        self.ip = ip
        self.nsrc = nsrc
        self.ndst = ndst
        try:
            self.data = requests.get('http://' + self.ip + '/nph-cgi_mtr?duration=-1&interval=0', stream=True, timeout=10)
        except:
            return

    def __iter__(self):
        return self

    def __next__(self):
        mtr = list()
        try:
            for chunk in self.data.iter_content(32 * (self.nsrc + self.ndst), '\n'):
                # DEBUG log chunk
                for line in chunk.split('\n'):
                    # DEBUG log line
                    if line.startswith('MTR'):
                        try:
                            _, io, num, val = line.split(' ')
                            l, r = val.split(':')[1], val.split(':')[2]
                            mtr.append((self.ip, io+num, l, r))
                        except:
                            # ERROR log line
                            pass
                        if len(mtr) == self.nsrc + self.ndst:
                            break
                if len(mtr) == self.nsrc + self.ndst:
                    yield mtr
                else:
                    continue
        except:
            # ERROR connection lost
            return


async def save_to_db(data_to_save):
    global pool
    try:
        async with pool.acquire() as conn:
            await conn.execute('''INSERT INTO mtr (ip, io, l, r) VALUES %s''' % ','.join(str(row) for row in data_to_save))
    finally:
        await pool.release(conn)


async def remove_from_db(ip):
    global pool
    try:
        async with pool.acquire() as conn:
            await conn.execute('''DELETE FROM httpmtr WHERE ip = $1''', ip)
    finally:
        await pool.release(conn)


async def http_mtr_worker():
    global workers_list
    global loop
    while True:
        await asyncio.sleep(0)
        for ip in list(workers_list):
            data_to_save = next(workers_list[ip])
            if data_to_save:
                asyncio.ensure_future(save_to_db(next(data_to_save)))
            await asyncio.sleep(0)


async def check_for_workers():
    global workers_list
    global pool
    while True:
        await asyncio.sleep(0)
        try:
            async with pool.acquire() as conn:
                workers = await conn.fetch('''SELECT ip FROM httpmtr''')
        finally:
            await pool.release(conn)
        for worker in workers:
            if worker['ip'] not in list(workers_list):
                workers_list[worker['ip']] = node_http_mtr(worker['ip'], 8, 8)
                await asyncio.sleep(0)
                print('Add worker', worker['ip'])
            await asyncio.sleep(0)
        ips_to_delete = set(workers_list.keys()) - set([i[0] for i in workers])
        if len(ips_to_delete) != 0:
            for ip in ips_to_delete:
                print('Delete worker ', ip)
                workers_list.pop(ip)
                await asyncio.sleep(0)


async def make_db_connection():
    pool = await asyncpg.create_pool(user='postgres', password='data', database='test', host='localhost', max_queries=50000, command_timeout=60)
    return pool


loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
pool = loop.run_until_complete(make_db_connection())
workers_list = {}
try:
    asyncio.ensure_future(check_for_workers())
    asyncio.ensure_future(http_mtr_worker())
    loop.run_forever()
except Exception as e:
    print(e)
    pass
finally:
    print("Closing Loop")
    loop.close()

我在 DB 中触发了程序,删除了所有早于 1 秒的数据,PostgreSQL 中一名工作人员的最终结果是:

test=# select count(*) from mtr;
 count
-------
   384
(1 ёЄЁюър)

这意味着每秒 384 个结果。每个设备中有 16 种不同类型的数据,所以我每秒有 384/16 = 24 个值。是合适的结果。但是我添加的工人越多,我的表现就越差:如果有 10 个工人,我的价值会少 2-3 倍。目标是拥有数百名工人和 24-25 个值/秒。接下来我尝试做的是使用aiohttp。我希望得到更好的结果。匆忙写了测试代码:

import asyncio
from aiohttp import ClientSession
import asyncpg

async def parse(line):
    if line.startswith('MTR'):
        _, io, num, val = line.split(' ')
        l, r = val.split(':')[1], val.split(':')[2]
    return ('ip.will.be.here', io + num, l, r)

async def run():
    url = "http://10.150.20.130/nph-cgi_mtr?duration=-1&interval=0"
    async with ClientSession() as session:
        while True:
            async with session.get(url) as response:
                buffer = b''
                start = False
                async for line in response.content.iter_any():
                    if line.startswith(b'\n'):
                        start = True
                        buffer += line
                    elif start and line.endswith(b'\n'):
                        buffer += line
                        mtr = [await parse(line) for line in buffer.decode().split('\n')[1:-1]]
                        await save_to_db(mtr)
                        break
                    elif start:
                        buffer += line


async def make_db_connection():
    pool = await asyncpg.create_pool(user='postgres', password='data', database='test', host='localhost', max_queries=50000, command_timeout=60)
    return pool


async def save_to_db(data_to_save):
    global pool
    try:
        async with pool.acquire() as conn:
            await conn.execute('''INSERT INTO mtr (ip, io, l, r) VALUES %s''' % ','.join(str(row) for row in data_to_save))
    finally:
        await pool.release(conn)


loop = asyncio.get_event_loop()
pool = loop.run_until_complete(make_db_connection())
future = asyncio.ensure_future(run())
loop.run_until_complete(future)

我有这个:

test=# select count(*) from mtr;
 count
-------
    80
(1 ёЄЁюър)

即异步请求的性能要差 5 倍。我卡住了。我不明白如何解决它。

更新。剖析并没有使情况更清楚。

请求: 请求配置文件 aiohttp: aiohttp 配置文件

有了请求,情况或多或少就清楚了。但是我完全不明白异步 aiohttp 有什么问题。

2018 年 5 月 16 日更新。最后我回到了多线程,我得到了我需要的东西——大量工人的稳定性能。异步调用确实不是灵丹妙药。

4

0 回答 0