我正在尝试在 python 中实现快速下载器。使用 aiofiles 时出现错误
async with aiofiles.open(pathlib.Path(self.__dest_path / filename).resolve(), 'wb') as fd:
for chunk in await response.content.read(self.__chunk_size):
if chunk: # filter out keep-alive new chunks
await fd.write(chunk)
Traceback (most recent call last):
File "C:\Users\Khav\PycharmProjects\AsyncDownloaderTest\AsyncDownloader.py", line 119, in fetch_content
await fd.write(chunk)
File "C:\Users\Khav\AppData\Roaming\Python\Python36\site-packages\aiofiles\threadpool\utils.py", line 36, in method
return (yield from self._loop.run_in_executor(self._executor, cb))
File "C:\Users\Khav\AppData\Local\Programs\Python\Python36-32\lib\concurrent\futures\thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
TypeError: a bytes-like object is required, not 'int'
import pandas as pd
import os
import pathlib
import fire
from aiohttp.resolver import AsyncResolver
import aiohttp
import asyncio
import aiofiles
from timeit import default_timer as timer
class AsyncDownloader:
"""Download files asynchronously"""
__urls = set()
__dest_path = None
__user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0'
__read_timeout = 60
__connection_timeout = 30
__download_count = 0 # unlimited
# http://www.browserscope.org/?category=network
__worker_count = 5 # No of threads to spawn
__chunk_size = 512
__download_time = -1
__errors = []
__success_urls = []
# TODO Fetch only content of a specific type from a csv
# TODO Improve code structure so that it can be used as a commandline tool (done)
# https://dbader.org/blog/python-commandline-tools-with-click
# https://github.com/google/python-fire
def set_source_csv(self, source_path, column_name):
self.source_path = source_path
self.column_name = column_name
my_csv = pd.read_csv(source_path, usecols=[self.column_name], chunksize=10)
except ValueError:
print("The column name doesn't exist")
return self
# No exception whatsoever
for chunk in my_csv:
self.__urls.update(set(getattr(chunk, self.column_name)))
return self
def set_destination_path(self, dest_path):
if dest_path.endswith('/'):
dest_path = dest_path[:-1]
self.dest_path = dest_path
# TODO Add exception in case we can't create the directory
pathlib.Path(self.dest_path).mkdir(parents=True, exist_ok=True)
if os.access(self.dest_path, os.W_OK):
self.__dest_path = pathlib.Path(self.dest_path).resolve()
return self
def set_user_agent(self, useragent):
self.useragent = useragent
self.__user_agent = self.useragent
return self
def set_connection_timeout(self, ctimeout_secs):
self.timeout_secs = ctimeout_secs
if self.timeout_secs >= 0:
self.__connection_timeout = self.timeout_secs
return self
def set_read_timeout(self, rtimeout_secs):
self.timeout_secs = rtimeout_secs
if self.timeout_secs >= 0:
self.__read_timeout = self.timeout_secs
return self
def set_download_count(self, file_count):
self.file_count = file_count
if self.file_count > 0:
self.__download_count = self.file_count
return self
def set_worker_count(self, worker_count):
self.worker_count = worker_count
if self.worker_count > 0:
self.__worker_count = self.worker_count
return self
def set_chunk_size(self, chunk_size):
self.chunk_size = chunk_size
if self.chunk_size > 0:
self.__chunk_size = self.chunk_size
return self
def print_urls(self):
return self
def get_download_time_secs(self):
return self
def get_errors(self):
return self
async def fetch_content(self, url):
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.167 Safari/537.36'}
resolver = AsyncResolver(nameservers=["", ""])
conn = aiohttp.TCPConnector(limit=self.__worker_count, ttl_dns_cache=300, verify_ssl=False)
async with aiohttp.ClientSession(connector=conn,
conn_timeout=self.__connection_timeout) as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
filename = os.path.basename(str(response.url))
async with aiofiles.open(pathlib.Path(self.__dest_path / filename).resolve(), 'wb') as fd:
for chunk in await response.content.read(self.__chunk_size):
if chunk: # filter out keep-alive new chunks
await fd.write(chunk)
def download(self):
if not pathlib.Path(self.__dest_path).exists():
return "OS error : Directory to save file is not defined or does not exist"
# Give an accurate file count even if we don't have to download it as it already exist
file_count = 0
urls_to_download = []
for url in self.__urls:
filename = os.path.basename(url)
# check if we need only a limited number of files
if self.__download_count != 0:
if file_count < self.__download_count:
# No need to download file if it already exist
if pathlib.Path(self.__dest_path / filename).is_file():
file_count += 1
file_count += 1
# Download all urls
if not pathlib.Path(self.__dest_path / filename).is_file():
loop = asyncio.get_event_loop()
tasks = []
for url in urls_to_download:
task = asyncio.ensure_future(self.fetch_content(url))
start = timer()
loop.run_until_complete(asyncio.sleep(0.250)) # for ssl connection
end = timer()
self.__download_time = end - start
def help(self):
"""Usage will go here"""
print("This is help section")
return self
if __name__ == '__main__':