3

假设我想使用 dnspython 为 Alexa 排名前 1M 的站点发送各种类型的 DNS 查询(A、AAAA、NS、SOA、DNSKEY、NSEC3、DS 等)。

一个一个地做这件事需要一点时间,因为我要为每个站点发送多个查询。所以我希望在 python 3 中使用 asyncio 执行一些并行性。

我经历了大卫的生成器/协程三部曲,http: //www.dabeaz.com/talks.html ,但我仍然不确定如何完成我的简单任务......

进一步来说,

results = dns.resolver.query('google.com','AAAA')

是一个阻塞函数调用,等待DNS回复回来。

如何在此等待时间内发送其他查询而不使用线程?由于 DNS 查询通常是 udp 数据包,我认为 asyncio 可能会有所帮助。

pycares 似乎不支持我需要的所有记录类型,因此 pycares 和 aiodns 不适用于我的情况。

任何参考和想法都会有所帮助。

4

1 回答 1

4

最近,DNSPython 现在有了原生 AsyncIO 支持,尽管文档有点缺乏.

尽管如此,现在可以使用 dnspython 进行查询,而无需 hacky 线程池解决方案。

AsyncIO DNSPython 示例

下面是一个使用 dnspython 的 AsyncIOResolver类的简单示例,它使用包装函数,并asyncio.gather有效地进行批量查询:

from dns.asyncresolver import Resolver
import dns.resolver
import dns.rrset
import asyncio
from typing import Tuple


async def dns_query(domain: str, rtype: str = 'A', **kwargs) -> dns.rrset.RRset:
    kwargs, res_cfg = dict(kwargs), {}
    # extract 'filename' and 'configure' from kwargs if they're present
    # to be passed to Resolver. we pop them to avoid conflicts passing kwargs
    # to .resolve().
    if 'filename' in kwargs: res_cfg['filename'] = kwargs.pop('filename')
    if 'configure' in kwargs: res_cfg['configure'] = kwargs.pop('configure')

    # create an asyncio Resolver instance
    rs = Resolver(**res_cfg)

    # call and asynchronously await .resolve() to obtain the DNS results
    res: dns.resolver.Answer = await rs.resolve(domain, rdtype=rtype, **kwargs)

    # we return the most useful part of Answer: the RRset, which contains
    # the individual records that were found.
    return res.rrset


async def dns_bulk(*queries: Tuple[str, str], **kwargs):
    ret_ex = kwargs.pop('return_exceptions', True)

    # Iterate over the queries and call (but don't await) the dns_query coroutine
    # with each query.
    # Without 'await', they won't properly execute until we await the coroutines
    # either individually, or in bulk using asyncio.gather
    coros = [dns_query(dom, rt, **kwargs) for dom, rt in list(queries)]

    # using asyncio.gather, we can effectively run all of the coroutines
    # in 'coros' at the same time, instead of awaiting them one-by-one.
    #
    # return_exceptions controls whether gather() should immediately
    # fail and re-raise as soon as it detects an exception,
    # or whether it should just capture any exceptions, and simply
    # return them within the results.
    #
    # in this example function, return_exceptions is set to True,
    # which means if one or more of the queries fail, it'll simply
    # store the exceptions and continue running the remaining coros,
    # and return the exceptions inside of the tuple/list of results.
    return await asyncio.gather(*coros, return_exceptions=ret_ex)


async def main():
    queries = [
        ('privex.io', 'AAAA'),
        ('privex.io', 'TXT'),
        ('google.com', 'A'),
        ('google.com', 'AAAA'),
        ('examplesitedoesnotexist.test', 'A'),
    ]
    print(f"\n [...] Sending {len(queries)} bulk queries\n")
    res = await dns_bulk(*queries)
    print(f"\n [+++] Got {len(res)} results! :)\n\n")

    for i, a in enumerate(res):
        print("\n------------------------------------------------------------\n")
        if isinstance(a, Exception):
            print(f" [!!!] Error: Result {i} is an exception! Original query: {queries[i]} || Exception is: {type(a)} - {a!s} \n")
            continue
        print(f" [+++] Got result for query {i} ( {queries[i]} )\n")
        print(f"  >>>  Representation: {a!r}")
        print(f"  >>>  As string:")
        print(f"    {a!s}")
        print()
    print("\n------------------------------------------------------------\n")

asyncio.run(main())

以下是运行上述脚本时的输出:


 [...] Sending 5 bulk queries


 [+++] Got 5 results! :)



------------------------------------------------------------

 [+++] Got result for query 0 ( ('privex.io', 'AAAA') )

  >>>  Representation: <DNS privex.io. IN AAAA RRset: [<2a07:e00::abc>]>
  >>>  As string:
    privex.io. 221 IN AAAA 2a07:e00::abc


------------------------------------------------------------

 [+++] Got result for query 1 ( ('privex.io', 'TXT') )

  >>>  Representation: <DNS privex.io. IN TXT RRset: [<"v=spf1 include:spf.messagingengine.com include:smtp.privex.io -all">, <"google-site-verification=_0OlLdacq3GAc4NkhOd0pBcLsNya3KApS0iAc6MtbYU">]>
  >>>  As string:
    privex.io. 300 IN TXT "v=spf1 include:spf.messagingengine.com include:smtp.privex.io -all"
privex.io. 300 IN TXT "google-site-verification=_0OlLdacq3GAc4NkhOd0pBcLsNya3KApS0iAc6MtbYU"


------------------------------------------------------------

 [+++] Got result for query 2 ( ('google.com', 'A') )

  >>>  Representation: <DNS google.com. IN A RRset: [<216.58.205.46>]>
  >>>  As string:
    google.com. 143 IN A 216.58.205.46


------------------------------------------------------------

 [+++] Got result for query 3 ( ('google.com', 'AAAA') )

  >>>  Representation: <DNS google.com. IN AAAA RRset: [<2a00:1450:4009:80f::200e>]>
  >>>  As string:
    google.com. 221 IN AAAA 2a00:1450:4009:80f::200e


------------------------------------------------------------

 [!!!] Error: Result 4 is an exception! Original query: ('examplesitedoesnotexist.test', 'A') || Exception is: <class 'dns.resolver.NXDOMAIN'> - The DNS query name does not exist: examplesitedoesnotexist.test. 


------------------------------------------------------------

使用 AsyncIO 的后台任务

假设您的应用程序是纯 AsyncIO,那么可以在后台运行协程,而不需要线程:

import asyncio

async def hello():
    for i in range(10):
        print("hello world")
        await asyncio.sleep(2.0)

async def lorem():
    for i in range(20):
        print("lorem ipsum dolor")
        await asyncio.sleep(1.0)


async def my_app():
    print(" [...] creating tsk_hello and tsk_lorem")
    tsk_hello = asyncio.create_task(hello())
    tsk_lorem = asyncio.create_task(lorem())

    # let them both run for 5 seconds
    print(" [...] waiting 5 secs")
    await asyncio.sleep(5.0)

    # now, assuming you wanted to cancel a looping task before it's finished
    # (or tasks that are endless 'while True' loops)
    # we can use the tsk_x task objects to ask them to stop immediately.
    print(" [...] stopping tsk_hello")
    tsk_hello.cancel()
    print(" [...] waiting 4 secs")
    await asyncio.sleep(4.0)
    print(" [...] stopping tsk_lorem")
    tsk_lorem.cancel()

asyncio.run(my_app())

如果您为 AsyncIO 后台任务运行上述示例代码,输出将如下所示,显示两者loremhello可以并排运行,以及主入口点函数:

 [...] waiting 5 secs
hello world
lorem ipsum dolor
lorem ipsum dolor
hello world
lorem ipsum dolor
lorem ipsum dolor
hello world
lorem ipsum dolor
 [...] stopping tsk_hello
 [...] waiting 4 secs
lorem ipsum dolor
lorem ipsum dolor
lorem ipsum dolor
lorem ipsum dolor
 [...] stopping tsk_lorem
于 2021-03-20T07:30:04.023 回答