0

所以这里我有异步刮板

import asyncio
import datetime
import json
import os
import time

import aiohttp
import xmltodict  # type: ignore
from bs4 import BeautifulSoup  # type: ignore

t0 = time.time()
BASE_URL = "https://markets.businessinsider.com"
HEADERS = {
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)\
                   Chrome/94.0.4606.61 Safari/537.36',
    'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,\
               */*;q=0.8,application/signed-exchange;v=b3;q=0.9'
}


current_date = datetime.datetime.now().strftime('%d.%m.%Y')
# dict for currencies
current_rate = {}
# list for company data
companies_data = []


def round_float(float_num):
    """Rounds incoming float to 2 digits after floating point"""
    return float(f"{float_num:.2f}")


async def get_current_rate(session, id):
    """
    function takes an id that corresponds to certaing currency
    list of ids you can find here http://www.cbr.ru/scripts/XML_val.asp?d=0
    it returns current exchange rate in rubles
    """
    curr_rates_url = f'https://www.cbr.ru/scripts/XML_daily.asp?date_req={current_date}'
    async with session.get(url=curr_rates_url, headers=HEADERS) as response:
        response_text = await response.text()
        currencies = xmltodict.parse(response_text)
        for i in currencies['ValCurs']['Valute']:
            if i['@ID'] == id:
                current_rate['USD'] = float(i['Value'].replace(',', '.'))
                break


async def parse_details_page(session, url):
    """
    function parses details page of company
    return it s code, P/E and potential profit
    potential profit is calcilated from week_high and week_low
    if there no data for these variables potential profit cant be calculated
    """
    async with session.get(url=url, headers=HEADERS) as response:
        response_text = await response.text()
        soup = BeautifulSoup(response_text, 'lxml')
        code = soup.find('span', class_='price-section__category').text.strip().split()[-1]
        pe = float(soup.find('div', class_='snapshot__data-item')
                       .find(text=True).strip().replace(',', ''))
        try:
            week_high = float(soup.find('div', class_='snapshot__data-item--right')
                                  .find(text=True).strip().replace(',', ''))
        except AttributeError:
            week_high = None
        try:
            week_low = float(soup.find('div', class_='snapshot__data-item')
                                 .find(text=True).strip().replace(',', ''))
        except AttributeError:
            week_low = None
        if week_high and week_low:
            potential_profit = round_float(week_high - week_low)
        else:
            potential_profit = 'No data for calculating potential profit'
        return (code, pe, potential_profit)


async def get_page_data(session, page, current_rate):
    page_url = BASE_URL + f'/index/components/s&p_500?p={str(page)}'
    # # async requests with  aiohttp
    async with session.get(url=page_url, headers=HEADERS) as response:
        response_text = await response.text()

        # #  Getting data from soup object
        soup = BeautifulSoup(response_text, 'lxml')
        company_items = soup.find('tbody', class_="table__tbody").find_all("tr")
        for ci in company_items:
            company_data = ci.find_all('td')
            details_page_link = BASE_URL + company_data[0].find('a').attrs['href']

            company_name = company_data[0].find("a").text.strip()
            curr_price = round_float(float(company_data[1].text.split()[0]
                                                          .strip()
                                                          .replace(',', '')) * current_rate)
            code, pe, potential_profit = await parse_details_page(session, details_page_link)
            growth = round_float(float(company_data[-1].find("span").text.strip().replace(',', '')))

            companies_data.append(
                {
                    "code": code,
                    "name": company_name,
                    "price": curr_price,
                    "P/E": pe,
                    "growth": growth,
                    "potential-profit": potential_profit
                }
            )

    print(f"[INFO] {page} page is parsed")
    
    return companies_data


async def gather_data():
    # creating session
    async with aiohttp.ClientSession() as session:

        await get_current_rate(session, 'R01235')
        # parsing pagination
        response = await session.get(url=BASE_URL + '/index/components/s&p_500', headers=HEADERS)
        soup = BeautifulSoup(await response.text(), 'lxml')
        pages_count = int(soup.find("div", class_="finando_paging margin-top--small")
                              .find_all("a")[-2].text)

        # task queue
        tasks = []

        # scrapper works for every page
        for page in range(1, pages_count + 1):
            task = asyncio.create_task(get_page_data(session, page, current_rate['USD']))
            tasks.append(task)

        await asyncio.gather(*tasks)


def most_expensv_cmpns(data):
    """filteres data by most expensive price"""
    return sorted(data, key=lambda x: x['price'], reverse=True)[0:10]


def lowest_pe(data):
    """filteres data by lowest P/E"""
    return sorted(data, key=lambda x: x['P/E'])[0:10]


def highest_growth(data):
    """filteres data by highest growth"""
    return sorted(data, key=lambda x: x['growth'], reverse=True)[0:10]


def highest_profit(data):
    """filteres data by highest potential-profit"""
    filtered_data = list(filter(lambda x: isinstance(x['potential-profit'], float), data))
    return sorted(filtered_data, key=lambda x: x['potential-profit'], reverse=True)[0:10]


def create_file(name, filter_func):
    """Creates file with filtered data"""
    with open(name, "w", encoding="utf-8") as file:
        json.dump(filter_func(companies_data), file, indent=4, ensure_ascii=False)


def main():
    # if os is windows set corresponding event loop policy
    if os.name == 'nt':
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(gather_data())
    # creating json files
    create_file(f'10_most_expensive_stocks_{current_date}.json', most_expensv_cmpns)
    create_file(f'10_lowest_p_e_{current_date}.json', lowest_pe)
    create_file(f'10_highest_growth{current_date}.json', highest_growth)
    create_file(f'10_highest_profit{current_date}.json', highest_profit)


if __name__ == '__main__':
    main()
    print(time.time() - t0)

https://markets.businessinsider.com/index/components/s&p_500这是我需要解析的网站

所以 get_page_data 函数解析每个页面,在这个函数里面 parse_details_page 解析这个页面上每个公司的数据

我至少有两个需要模拟的请求,但我无法将头绕在内部函数内部的模拟 url 上。

我知道应该用 requests-mock 和 pytest-asyncio 来完成,但不知道怎么做

如果您能给我一个提示,我将不胜感激!

4

0 回答 0