我有一个循环遍历 IP 地址列表并返回有关它们的一些信息的过程。简单的 for 循环效果很好,由于 Python 的全局解释器锁 (GIL),我的问题是大规模运行它。
我的目标是让这个函数并行运行并充分利用我的 4 个内核。这样,当我运行其中的 100K 时,我不会通过正常的 for 循环花费 24 小时。
在这里阅读其他答案后,尤其是这个答案,如何并行化一个简单的 Python 循环?,我决定使用joblib。当我通过它运行 10 条记录时(上面的示例),运行时间超过 10 分钟。这听起来不像工作正常。我知道我做错了什么或不理解。任何帮助是极大的赞赏!
import pandas as pd
import numpy as np
import os as os
from ipwhois import IPWhois
from joblib import Parallel, delayed
import multiprocessing
num_core = multiprocessing.cpu_count()
iplookup = ['174.192.22.197',\
'70.197.71.201',\
'174.195.146.248',\
'70.197.15.130',\
'174.208.14.133',\
'174.238.132.139',\
'174.204.16.10',\
'104.132.11.82',\
'24.1.202.86',\
'216.4.58.18']
正常的for循环可以正常工作!
asn=[]
asnid=[]
asncountry=[]
asndesc=[]
asnemail = []
asnaddress = []
asncity = []
asnstate = []
asnzip = []
asndesc2 = []
ipaddr=[]
b=1
totstolookup=len(iplookup)
for i in iplookup:
i = str(i)
print("Running #{} out of {}".format(b,totstolookup))
try:
obj=IPWhois(i,timeout=15)
result=obj.lookup_whois()
asn.append(result['asn'])
asnid.append(result['asn_cidr'])
asncountry.append(result['asn_country_code'])
asndesc.append(result['asn_description'])
try:
asnemail.append(result['nets'][0]['emails'])
asnaddress.append(result['nets'][0]['address'])
asncity.append(result['nets'][0]['city'])
asnstate.append(result['nets'][0]['state'])
asnzip.append(result['nets'][0]['postal_code'])
asndesc2.append(result['nets'][0]['description'])
ipaddr.append(i)
except:
asnemail.append(0)
asnaddress.append(0)
asncity.append(0)
asnstate.append(0)
asnzip.append(0)
asndesc2.append(0)
ipaddr.append(i)
except:
pass
b+=1
传递给joblib以在所有内核上运行的函数!
def run_ip_process(iplookuparray):
asn=[]
asnid=[]
asncountry=[]
asndesc=[]
asnemail = []
asnaddress = []
asncity = []
asnstate = []
asnzip = []
asndesc2 = []
ipaddr=[]
b=1
totstolookup=len(iplookuparray)
for i in iplookuparray:
i = str(i)
print("Running #{} out of {}".format(b,totstolookup))
try:
obj=IPWhois(i,timeout=15)
result=obj.lookup_whois()
asn.append(result['asn'])
asnid.append(result['asn_cidr'])
asncountry.append(result['asn_country_code'])
asndesc.append(result['asn_description'])
try:
asnemail.append(result['nets'][0]['emails'])
asnaddress.append(result['nets'][0]['address'])
asncity.append(result['nets'][0]['city'])
asnstate.append(result['nets'][0]['state'])
asnzip.append(result['nets'][0]['postal_code'])
asndesc2.append(result['nets'][0]['description'])
ipaddr.append(i)
except:
asnemail.append(0)
asnaddress.append(0)
asncity.append(0)
asnstate.append(0)
asnzip.append(0)
asndesc2.append(0)
ipaddr.append(i)
except:
pass
b+=1
ipdataframe = pd.DataFrame({'ipaddress':ipaddr,
'asn': asn,
'asnid':asnid,
'asncountry':asncountry,
'asndesc': asndesc,
'emailcontact': asnemail,
'address':asnaddress,
'city':asncity,
'state': asnstate,
'zip': asnzip,
'ipdescrip':asndesc2})
return ipdataframe
通过 joblib 使用所有内核运行进程
Parallel(n_jobs=num_core)(delayed(run_ip_process)(iplookuparray) for i in iplookup)