我正在尝试在 python 中实现快速下载器。使用 aiofiles 时出现错误
async with aiofiles.open(pathlib.Path(self.__dest_path / filename).resolve(), 'wb') as fd:
for chunk in await response.content.read(self.__chunk_size):
if chunk: # filter out keep-alive new chunks
await fd.write(chunk)
错误:
Traceback (most recent call last):
File "C:\Users\Khav\PycharmProjects\AsyncDownloaderTest\AsyncDownloader.py", line 119, in fetch_content
await fd.write(chunk)
File "C:\Users\Khav\AppData\Roaming\Python\Python36\site-packages\aiofiles\threadpool\utils.py", line 36, in method
return (yield from self._loop.run_in_executor(self._executor, cb))
File "C:\Users\Khav\AppData\Local\Programs\Python\Python36-32\lib\concurrent\futures\thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
TypeError: a bytes-like object is required, not 'int'
Task exception was never retrieved
future: <Task finished coro=<AsyncDownloader.fetch_content() done, defined at C:\Users\Khav\PycharmProjects\AsyncDownloaderTest\AsyncDownloader.py:105> exception=TypeError("a bytes-like object is required, not 'int'",)>
Traceback (most recent call last):
File "C:\Users\Khav\PycharmProjects\AsyncDownloaderTest\AsyncDownloader.py", line 119, in fetch_content
await fd.write(chunk)
File "C:\Users\Khav\AppData\Roaming\Python\Python36\site-packages\aiofiles\threadpool\utils.py", line 36, in method
return (yield from self._loop.run_in_executor(self._executor, cb))
File "C:\Users\Khav\AppData\Local\Programs\Python\Python36-32\lib\concurrent\futures\thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
TypeError: a bytes-like object is required, not 'int'
完整代码
import pandas as pd
import os
import pathlib
import fire
from aiohttp.resolver import AsyncResolver
import aiohttp
import asyncio
import aiofiles
from timeit import default_timer as timer
class AsyncDownloader:
"""Download files asynchronously"""
__urls = set()
__dest_path = None
__user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0'
__read_timeout = 60
__connection_timeout = 30
__download_count = 0 # unlimited
# http://www.browserscope.org/?category=network
__worker_count = 5 # No of threads to spawn
__chunk_size = 512
__download_time = -1
__errors = []
__success_urls = []
# TODO Fetch only content of a specific type from a csv
# TODO Improve code structure so that it can be used as a commandline tool (done)
# https://dbader.org/blog/python-commandline-tools-with-click
# https://github.com/google/python-fire
def set_source_csv(self, source_path, column_name):
self.source_path = source_path
self.column_name = column_name
try:
my_csv = pd.read_csv(source_path, usecols=[self.column_name], chunksize=10)
except ValueError:
print("The column name doesn't exist")
return self
else:
# No exception whatsoever
for chunk in my_csv:
self.__urls.update(set(getattr(chunk, self.column_name)))
return self
def set_destination_path(self, dest_path):
if dest_path.endswith('/'):
dest_path = dest_path[:-1]
self.dest_path = dest_path
# TODO Add exception in case we can't create the directory
pathlib.Path(self.dest_path).mkdir(parents=True, exist_ok=True)
if os.access(self.dest_path, os.W_OK):
self.__dest_path = pathlib.Path(self.dest_path).resolve()
return self
def set_user_agent(self, useragent):
self.useragent = useragent
self.__user_agent = self.useragent
return self
def set_connection_timeout(self, ctimeout_secs):
self.timeout_secs = ctimeout_secs
if self.timeout_secs >= 0:
self.__connection_timeout = self.timeout_secs
return self
def set_read_timeout(self, rtimeout_secs):
self.timeout_secs = rtimeout_secs
if self.timeout_secs >= 0:
self.__read_timeout = self.timeout_secs
return self
def set_download_count(self, file_count):
self.file_count = file_count
if self.file_count > 0:
self.__download_count = self.file_count
return self
def set_worker_count(self, worker_count):
self.worker_count = worker_count
if self.worker_count > 0:
self.__worker_count = self.worker_count
return self
def set_chunk_size(self, chunk_size):
self.chunk_size = chunk_size
if self.chunk_size > 0:
self.__chunk_size = self.chunk_size
return self
def print_urls(self):
print(self.__urls)
return self
def get_download_time_secs(self):
print(self.__download_time)
return self
def get_errors(self):
print(self.__errors)
return self
async def fetch_content(self, url):
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.167 Safari/537.36'}
resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
conn = aiohttp.TCPConnector(limit=self.__worker_count, ttl_dns_cache=300, verify_ssl=False)
async with aiohttp.ClientSession(connector=conn,
read_timeout=self.__read_timeout,
conn_timeout=self.__connection_timeout) as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
filename = os.path.basename(str(response.url))
async with aiofiles.open(pathlib.Path(self.__dest_path / filename).resolve(), 'wb') as fd:
for chunk in await response.content.read(self.__chunk_size):
if chunk: # filter out keep-alive new chunks
await fd.write(chunk)
def download(self):
if not pathlib.Path(self.__dest_path).exists():
return "OS error : Directory to save file is not defined or does not exist"
# Give an accurate file count even if we don't have to download it as it already exist
file_count = 0
urls_to_download = []
for url in self.__urls:
filename = os.path.basename(url)
# check if we need only a limited number of files
if self.__download_count != 0:
if file_count < self.__download_count:
# No need to download file if it already exist
if pathlib.Path(self.__dest_path / filename).is_file():
file_count += 1
continue
else:
file_count += 1
urls_to_download.append(url)
else:
# Download all urls
if not pathlib.Path(self.__dest_path / filename).is_file():
urls_to_download.append(url)
loop = asyncio.get_event_loop()
tasks = []
for url in urls_to_download:
task = asyncio.ensure_future(self.fetch_content(url))
tasks.append(task)
start = timer()
loop.run_until_complete(asyncio.wait(tasks))
loop.run_until_complete(asyncio.sleep(0.250)) # for ssl connection
loop.close()
end = timer()
self.__download_time = end - start
def help(self):
"""Usage will go here"""
print("This is help section")
return self
if __name__ == '__main__':
fire.Fire(AsyncDownloader)