0

我正在尝试使用谷歌页面速度洞察 API 捕获 7k url 的灯塔指标数据,而不使用 API 密钥。我创建了一个 python 脚本来执行这个过程。我可以使用这种方法捕获前 1000 个 url 数据。Python 脚本对所有 7k 个 url 运行没有任何错误,但在 1000 个 url 之后停止写入数据。我是否需要 API 密钥或某些权限才能对大量 URL 执行此过程?获取 7k urls 数据是否有任何前置要求。我什至在 python 脚本中加入了多线程来加速这个过程。

以下是python脚本:

start = time.perf_counter()
urls= df_final['URL'].unique().tolist()
a= ['desktop','mobile']
def role_session(url,device):

    lighthouse=pd.DataFrame()
    lighthouse= pd.DataFrame(columns=['Date','URL','First Contentful Paint','Time to Interactive','Cumulative layout Shift','First Meaningful Paint','Largest Contentful Paint','Speed Index','Total Blocking Time','Java Execution Time','Remove Unused JavaScript','Server Initial Response Time','DOM_size','device'])

    x = f'https://www.googleapis.com/pagespeedonline/v5/runPagespeed?url={url}&strategy={device}'
    r=''
    while r == '' :
        try:
            r = requests.get(x, verify= True)
            final = r.json()
            date = datetime.today().date()
            date = str(date)
            urlid = final['id']

            split = urlid.split('?') # This splits the absolute url from the api key parameter
            urlid = split[0] # This reassigns urlid to the absolute url
            ID = str(urlid)
            urlfcp = final['lighthouseResult']['audits']['first-contentful-paint']['displayValue']
            FCP = str(urlfcp)
            urltti = final['lighthouseResult']['audits']['interactive']['displayValue']
            TTI = str(urltti)
            urlcls= final['lighthouseResult']['audits']['cumulative-layout-shift']['displayValue']
            CLS= str(urlcls)
            urlfmp= final['lighthouseResult']['audits']['first-meaningful-paint']['displayValue']
            FMP = str(urlfmp)
            urllcp= final['lighthouseResult']['audits']['largest-contentful-paint']['displayValue']
            LCP = str(urllcp)
            urlspeedidx= final['lighthouseResult']['audits']['speed-index']['displayValue']
            SPEED_INDEX = str(urlspeedidx)
            totalblocktime= final['lighthouseResult']['audits']['total-blocking-time']['displayValue']
            TOTAL_BLOCKING_TIME = str(totalblocktime)
            TOTAL_BLOCKING_TIME = TOTAL_BLOCKING_TIME.replace(',', '')
            executiontime=  final['lighthouseResult']['audits']['bootup-time']['displayValue']
            JAVA_EXECUTION_TIME = str(executiontime)
            JAVA_EXECUTION_TIME = JAVA_EXECUTION_TIME.replace(',','')
            unused_js = final['lighthouseResult']['audits']['unused-javascript']['displayValue']
            REMOVE_UNUSED_JS = str(unused_js)
            REMOVE_UNUSED_JS = REMOVE_UNUSED_JS.replace(',','').replace('Potential savings of ','')
            server_responsetime = final['lighthouseResult']['audits']['server-response-time']['displayValue']
            SERVER_INITIAL_RESPONSE_TIME = str(server_responsetime)
            SERVER_INITIAL_RESPONSE_TIME= SERVER_INITIAL_RESPONSE_TIME.replace(',','').replace('Root document took ','')
            dom_size = final['lighthouseResult']['audits']['dom-size']['displayValue']
            DOM_SIZE = str(dom_size)
            DOM_SIZE= DOM_SIZE.replace(',','').replace(' elements','')

            lighthouse = lighthouse.append({"Date": date, 'URL': ID, 'First Contentful Paint': FCP,'Time to Interactive': TTI,'Cumulative layout Shift': CLS,'First Meaningful Paint': FMP,'Largest Contentful Paint': LCP,'Speed Index': SPEED_INDEX,'Total Blocking Time':TOTAL_BLOCKING_TIME,'Java Execution Time':JAVA_EXECUTION_TIME,'Remove Unused JavaScript':REMOVE_UNUSED_JS,'Server Initial Response Time':SERVER_INITIAL_RESPONSE_TIME,'DOM_size':DOM_SIZE,'device':device}, ignore_index=True)
            lighthouse.drop_duplicates(keep='first',inplace=True)

            midtime = time.perf_counter()
            print("query complete Time: %s" % (midtime-start))
            break

        except requests.ConnectionError as e:
            print(f'Error is {url} and strategy {device}')
            continue
        except requests.Timeout as e:
            print(f'OOPS!! Timeout Error" {url}')
            continue
        except requests.RequestException as e:
            print(f'OOPS!! General Error" {url}')
            continue
        except KeyboardInterrupt:
            print(f'Someone closed the program" {url}')
            time.sleep(5)
            print("Was a nice sleep, now let me continue...")
            continue

    lighthouse.to_csv('testing1.csv',index = False,mode='a',header=False)


threads = [threading.Thread(target= role_session, args=(url,device)) for url,device in itertools.product(urls,a)]
print(threads)
print(len(threads))
for thread in threads:
    thread.start()
    print("\n Live long and prosper!")
    sleep(5)               # Conventional sleep() Method.
    print("\n Just let that soak in..")   
    Event().wait(5.0) # wait() Method, useable sans thread.
    print("\n Make it So! = )\n")
for thread in threads:
    thread.join()
    print("\n Live long and prosper!")
    sleep(5)               # Conventional sleep() Method.
    print("\n Just let that soak in..")   
    Event().wait(5.0) # wait() Method, useable sans thread.
    print("\n Make it So! = )\n")


end= time.perf_counter()
print("Elapsed Time: %s" % (end-start))
4

0 回答 0