0

我有一个 pandas 数据框,其中有一列包含每个电子邮件地址的主机名(超过 1000 行):

email               hostname
email@example.com   example.com
email2@example.com  example.com
email@example2.com  example2.com
email@example3.com  example3.com

我想检查每个主机名并检查它是否真的存在。

email               hostname      valid_hostname
email@example.com   example.com   True
email2@example.com  example.com   False
email@example2.com  example2.com  False
email@example3.com  example3.com  False

首先,我提取了每个电子邮件地址的主机名:

df['hostname'] = df['email'].str.split('@').str[1]

然后,我尝试使用 来检查 DNS pyIsEmail,但这太慢了

from pyisemail import is_email    
df['valid_hostname'] = df['hostname'].apply(lambda x: is_email(x, check_dns=True))

然后,我尝试了一个多线程函数:

import requests
from requests.exceptions import ConnectionError

def validate_hostname_existence(hostname:str):
    try:
        response = requests.get(f'http://{hostname}', timeout=0.5)
    except ConnectionError:
        return False
    else:
        return True

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
        df['valid_hostname'] = pd.Series(executor.map(validate_hostname_existence, df['hostname']),index=df['hostname'].index)

但这也不太顺利,因为我对并行函数还很陌生。它有多个错误,我相信如果我能以某种方式首先检查该主机名是否已被检查并再次跳过整个请求,那会更有益。我想在不发送电子邮件的情况下尽我所能。

有没有图书馆或方法可以做到这一点?因为到目前为止我找不到解决这个问题的合适方法。

4

2 回答 2

0

您回答了自己,您可以使用缓存将您已经检查过的主机名保存在内存中。

例如:

   from functools import lru_cache
   @lru_cache(max_size=None) 
   def my_is_email(x, check_dns=True):
       return is_email(x, check_dns=check_dns)

还建议限制大小以防止内存溢出。例如:

@lru_cache(max_size=256) 

欲了解更多信息,请阅读

于 2021-11-22T08:21:05.653 回答
0

我对熊猫一无所知,但这里是您可以并行处理电子邮件列表并取回一组有效电子邮件的方法。我相信你可以把它适应你的熊猫案。

from queue import Empty, Queue
from threading import Thread, Lock
from pyisemail import is_email

q = Queue()
lock = Lock()
valid = set()

emails = ["test@google.com", "test2@gmail.com"]
for e in emails:
    q.put(e)


def process_queue(queue: Queue):
    while True:
        try:
            email = queue.get(block=False)
        except Empty:
            break
        if is_email(email, check_dns=True):
            lock.acquire()
            valid.add(email)
            lock.release()


NUM_THREADS = 30
threads = []

for i in range(NUM_THREADS):
    thread = Thread(target=process_queue, args=(q,))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

print("done")
print(valid)

解释

  1. 创建一个填充电子邮件的队列对象
  2. 创建 NUM_THREADS 个线程。
  3. 每个线程从队列中拉取。如果他们收到电子邮件,它会处理电子邮件。锁定保护结果集的锁。添加到集合中。发布。如果没有留下任何电子邮件,则线程终止。
  4. 等待所有线程终止。
于 2021-11-22T20:59:46.747 回答