0

我正在尝试在 python 中实现快速下载器。使用 aiofiles 时出现错误

async with aiofiles.open(pathlib.Path(self.__dest_path / filename).resolve(), 'wb') as fd:
                    for chunk in await response.content.read(self.__chunk_size):
                        if chunk:  # filter out keep-alive new chunks
                            await fd.write(chunk)

错误:

Traceback (most recent call last):
  File "C:\Users\Khav\PycharmProjects\AsyncDownloaderTest\AsyncDownloader.py", line 119, in fetch_content
    await fd.write(chunk)
  File "C:\Users\Khav\AppData\Roaming\Python\Python36\site-packages\aiofiles\threadpool\utils.py", line 36, in method
    return (yield from self._loop.run_in_executor(self._executor, cb))
  File "C:\Users\Khav\AppData\Local\Programs\Python\Python36-32\lib\concurrent\futures\thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
TypeError: a bytes-like object is required, not 'int'
Task exception was never retrieved
future: <Task finished coro=<AsyncDownloader.fetch_content() done, defined at C:\Users\Khav\PycharmProjects\AsyncDownloaderTest\AsyncDownloader.py:105> exception=TypeError("a bytes-like object is required, not 'int'",)>
Traceback (most recent call last):
  File "C:\Users\Khav\PycharmProjects\AsyncDownloaderTest\AsyncDownloader.py", line 119, in fetch_content
    await fd.write(chunk)
  File "C:\Users\Khav\AppData\Roaming\Python\Python36\site-packages\aiofiles\threadpool\utils.py", line 36, in method
    return (yield from self._loop.run_in_executor(self._executor, cb))
  File "C:\Users\Khav\AppData\Local\Programs\Python\Python36-32\lib\concurrent\futures\thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
TypeError: a bytes-like object is required, not 'int'

完整代码

import pandas as pd
import os
import pathlib
import fire
from aiohttp.resolver import AsyncResolver
import aiohttp
import asyncio
import aiofiles
from timeit import default_timer as timer


class AsyncDownloader:
    """Download files asynchronously"""

    __urls = set()
    __dest_path = None
    __user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0'
    __read_timeout = 60
    __connection_timeout = 30
    __download_count = 0  # unlimited
    # http://www.browserscope.org/?category=network
    __worker_count = 5  # No of threads to spawn
    __chunk_size = 512
    __download_time = -1
    __errors = []
    __success_urls = []

    # TODO Fetch only content of a specific type from a csv
    # TODO Improve code structure so that it can be used as a commandline tool (done)
    # https://dbader.org/blog/python-commandline-tools-with-click
    # https://github.com/google/python-fire

    def set_source_csv(self, source_path, column_name):
        self.source_path = source_path
        self.column_name = column_name

        try:
            my_csv = pd.read_csv(source_path, usecols=[self.column_name], chunksize=10)
        except ValueError:
            print("The column name doesn't exist")
            return self
        else:
            # No exception whatsoever
            for chunk in my_csv:
                self.__urls.update(set(getattr(chunk, self.column_name)))
        return self

    def set_destination_path(self, dest_path):
        if dest_path.endswith('/'):
            dest_path = dest_path[:-1]
        self.dest_path = dest_path
        # TODO Add exception in case we can't create the directory
        pathlib.Path(self.dest_path).mkdir(parents=True, exist_ok=True)
        if os.access(self.dest_path, os.W_OK):
            self.__dest_path = pathlib.Path(self.dest_path).resolve()
        return self

    def set_user_agent(self, useragent):
        self.useragent = useragent
        self.__user_agent = self.useragent
        return self

    def set_connection_timeout(self, ctimeout_secs):
        self.timeout_secs = ctimeout_secs
        if self.timeout_secs >= 0:
            self.__connection_timeout = self.timeout_secs
        return self

    def set_read_timeout(self, rtimeout_secs):
        self.timeout_secs = rtimeout_secs
        if self.timeout_secs >= 0:
            self.__read_timeout = self.timeout_secs
        return self

    def set_download_count(self, file_count):
        self.file_count = file_count
        if self.file_count > 0:
            self.__download_count = self.file_count
        return self

    def set_worker_count(self, worker_count):
        self.worker_count = worker_count
        if self.worker_count > 0:
            self.__worker_count = self.worker_count
        return self

    def set_chunk_size(self, chunk_size):
        self.chunk_size = chunk_size
        if self.chunk_size > 0:
            self.__chunk_size = self.chunk_size
        return self

    def print_urls(self):
        print(self.__urls)
        return self

    def get_download_time_secs(self):
        print(self.__download_time)
        return self

    def get_errors(self):
        print(self.__errors)
        return self

    async def fetch_content(self, url):
        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.167 Safari/537.36'}
        resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
        conn = aiohttp.TCPConnector(limit=self.__worker_count, ttl_dns_cache=300, verify_ssl=False)
        async with aiohttp.ClientSession(connector=conn,
                                         read_timeout=self.__read_timeout,
                                         conn_timeout=self.__connection_timeout) as session:
            async with session.get(url, headers=headers) as response:
                if response.status == 200:
                    filename = os.path.basename(str(response.url))
                    async with aiofiles.open(pathlib.Path(self.__dest_path / filename).resolve(), 'wb') as fd:
                        for chunk in await response.content.read(self.__chunk_size):
                            if chunk:  # filter out keep-alive new chunks
                                await fd.write(chunk)

    def download(self):

        if not pathlib.Path(self.__dest_path).exists():
            return "OS error : Directory to save file is not defined or does not exist"

        # Give an accurate file count even if we don't have to download it as it already exist
        file_count = 0

        urls_to_download = []

        for url in self.__urls:
            filename = os.path.basename(url)
            # check if we need only a limited number of files
            if self.__download_count != 0:
                if file_count < self.__download_count:
                    # No need to download file if it already exist
                    if pathlib.Path(self.__dest_path / filename).is_file():
                        file_count += 1
                        continue
                    else:
                        file_count += 1
                        urls_to_download.append(url)
            else:
                # Download all urls
                if not pathlib.Path(self.__dest_path / filename).is_file():
                    urls_to_download.append(url)

        loop = asyncio.get_event_loop()

        tasks = []
        for url in urls_to_download:
            task = asyncio.ensure_future(self.fetch_content(url))
            tasks.append(task)
        start = timer()
        loop.run_until_complete(asyncio.wait(tasks))
        loop.run_until_complete(asyncio.sleep(0.250))  # for ssl connection
        loop.close()
        end = timer()
        self.__download_time = end - start

    def help(self):
        """Usage will go here"""
        print("This is help section")
        return self


if __name__ == '__main__':
    fire.Fire(AsyncDownloader)
4

1 回答 1

1

我不认为你可以在 response.content.read 返回值上循环我猜你在块上循环没有?也许这样的事情会起作用

chunk = await response.content.read(self.__chunk_size) While chunk: await fd.write(chunk) chunk = await response.content.read(self.__chunk_size)

于 2018-02-17T06:31:07.357 回答