我正在使用 Python requests 包上传一个大文件,但我找不到任何方法来返回有关上传进度的数据。我已经看到许多用于下载文件的进度表,但这些不适用于文件上传。
def progress(percent):
print percent
r = requests.post(URL, files={'f':hugeFileHandle}, callback=progress)
在此先感谢您的帮助 :)
我正在使用 Python requests 包上传一个大文件,但我找不到任何方法来返回有关上传进度的数据。我已经看到许多用于下载文件的进度表,但这些不适用于文件上传。
def progress(percent):
print percent
r = requests.post(URL, files={'f':hugeFileHandle}, callback=progress)
在此先感谢您的帮助 :)
import os
import sys
import requests # pip install requests
class upload_in_chunks(object):
def __init__(self, filename, chunksize=1 << 13):
self.filename = filename
self.chunksize = chunksize
self.totalsize = os.path.getsize(filename)
self.readsofar = 0
def __iter__(self):
with open(self.filename, 'rb') as file:
while True:
data = file.read(self.chunksize)
if not data:
self.readsofar += len(data)
percent = self.readsofar * 1e2 / self.totalsize
yield data
def __len__(self):
return self.totalsize
# XXX fails
r = requests.post("http://httpbin.org/post",
data=upload_in_chunks(__file__, chunksize=10))
要解决此问题,您可以创建一个类似于 urllib2 POST 进度监控的文件适配器:
class IterableToFileAdapter(object):
def __init__(self, iterable):
self.iterator = iter(iterable)
self.length = len(iterable)
def read(self, size=-1): # TBD: add buffer for `len(data) > size` case
return next(self.iterator, b'')
def __len__(self):
return self.length
it = upload_in_chunks(__file__, 10)
r = requests.post("http://httpbin.org/post", data=IterableToFileAdapter(it))
# pretty print
import json
json.dump(r.json, sys.stdout, indent=4, ensure_ascii=False)
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
import requests
def my_callback(monitor):
# Your callback function
print monitor.bytes_read
e = MultipartEncoder(
fields={'field0': 'value', 'field1': 'value',
'field2': ('filename', open('file.py', 'rb'), 'text/plain')}
m = MultipartEncoderMonitor(e, my_callback)
r = requests.post('http://httpbin.org/post', data=m,
headers={'Content-Type': m.content_type})
我从这里得到了它的代码:PyQt 中的简单文件上传进度条。我对其进行了一些更改,以使用 BytesIO 而不是 StringIO。
class CancelledError(Exception):
def __init__(self, msg):
self.msg = msg
Exception.__init__(self, msg)
def __str__(self):
return self.msg
__repr__ = __str__
class BufferReader(BytesIO):
def __init__(self, buf=b'',
self._callback = callback
self._cb_args = cb_args
self._cb_kwargs = cb_kwargs
self._progress = 0
self._len = len(buf)
BytesIO.__init__(self, buf)
def __len__(self):
return self._len
def read(self, n=-1):
chunk = BytesIO.read(self, n)
self._progress += int(len(chunk))
'size' : self._len,
'progress': self._progress
if self._callback:
self._callback(*self._cb_args, **self._cb_kwargs)
except: # catches exception from the callback
raise CancelledError('The upload was cancelled.')
return chunk
def progress(size=None, progress=None):
print("{0} / {1}".format(size, progress))
files = {"upfile": ("file.bin", open("file.bin", 'rb').read())}
(data, ctype) = requests.packages.urllib3.filepost.encode_multipart_formdata(files)
headers = {
"Content-Type": ctype
body = BufferReader(data, progress)
requests.post(url, data=body, headers=headers)
诀窍是,使用 urllib3 中的 encode_multipart_formdata() 从文件列表手动生成数据和标题
import requests
import tqdm
with open(file_name, 'rb') as f:
r = requests.post(url, data=tqdm(f.readlines()))
通常你会构建一个流式数据源(一个生成器),它读取分块的文件并在途中报告其进度(参见kennethreitz/requests#663。这不适用于请求文件 api,因为请求不支持流式上传(请参阅kennethreitz/requests#295 ) – 要上传的文件需要在内存中完成,然后才能开始处理。
但是请求可以像 JF Sebastian 之前证明的那样从生成器流式传输内容,但是该生成器需要生成完整的数据流,包括多部分编码和边界。这就是海报发挥作用的地方。
海报最初是为与 python的urllib2一起使用而编写的,并支持多部分请求的流式生成,并在它进行时提供进度指示。海报主页提供了与 urllib2 一起使用的示例,但您真的不想使用 urllib2。查看这个示例代码,了解如何使用 urllib2 进行 HTTP 基本身份验证。太可怕了。
# load requests-module, a streamlined http-client lib
import requests
# load posters encode-function
from poster.encode import multipart_encode
# an adapter which makes the multipart-generator issued by poster accessable to requests
# based upon code from http://stackoverflow.com/a/13911048/1659732
class IterableToFileAdapter(object):
def __init__(self, iterable):
self.iterator = iter(iterable)
self.length = iterable.total
def read(self, size=-1):
return next(self.iterator, b'')
def __len__(self):
return self.length
# define a helper function simulating the interface of posters multipart_encode()-function
# but wrapping its generator with the file-like adapter
def multipart_encode_for_requests(params, boundary=None, cb=None):
datagen, headers = multipart_encode(params, boundary, cb)
return IterableToFileAdapter(datagen), headers
# this is your progress callback
def progress(param, current, total):
if not param:
# check out http://tcd.netinf.eu/doc/classnilib_1_1encode_1_1MultipartParam.html
# for a complete list of the properties param provides to you
print "{0} ({1}) - {2:d}/{3:d} - {4:.2f}%".format(param.name, param.filename, current, total, float(current)/float(total)*100)
# generate headers and gata-generator an a requests-compatible format
# and provide our progress-callback
datagen, headers = multipart_encode_for_requests({
"input_file": open('recordings/really-large.mp4', "rb"),
"another_input_file": open('recordings/even-larger.mp4', "rb"),
"field": "value",
"another_field": "another_value",
}, cb=progress)
# use the requests-lib to issue a post-request with out data attached
r = requests.post(
auth=('user', 'password'),
# show response-code and -body
print r, r.text
我的上传服务器不支持块编码,所以我想出了这个解决方案。它基本上只是 python 的一个包装器IOBase
import io
import requests
from typing import Union
from tqdm import tqdm
from tqdm.utils import CallbackIOWrapper
class UploadChunksIterator(Iterable):
This is an interface between python requests and tqdm.
Make tqdm to be accessed just like IOBase for requests lib.
def __init__(
self, file: Union[io.BufferedReader, CallbackIOWrapper], total_size: int, chunk_size: int = 16 * 1024
): # 16MiB
self.file = file
self.chunk_size = chunk_size
self.total_size = total_size
def __iter__(self):
return self
def __next__(self):
data = self.file.read(self.chunk_size)
if not data:
raise StopIteration
return data
# we dont retrive len from io.BufferedReader because CallbackIOWrapper only has read() method.
def __len__(self):
return self.total_size
fp = "data/mydata.mp4"
s3url = "example.com"
_quiet = False
with open(fp, "rb") as f:
total_size = os.fstat(f.fileno()).st_size
if not _quiet:
f = tqdm.wrapattr(f, "read", desc=hv, miniters=1, total=total_size, ascii=True)
with f as f_iter:
res = requests.put(
data=UploadChunksIterator(f_iter, total_size=total_size),
在信息进度条方面使@jfs 的答案更好。
import math
import os
import requests
import sys
class ProgressUpload:
def __init__(self, filename, chunk_size=1250):
self.filename = filename
self.chunk_size = chunk_size
self.file_size = os.path.getsize(filename)
self.size_read = 0
self.divisor = min(math.floor(math.log(self.file_size, 1000)) * 3, 9) # cap unit at a GB
self.unit = {0: 'B', 3: 'KB', 6: 'MB', 9: 'GB'}[self.divisor]
self.divisor = 10 ** self.divisor
def __iter__(self):
progress_str = f'0 / {self.file_size / self.divisor:.2f} {self.unit} (0 %)'
sys.stderr.write(f'\rUploading {dist_file}: {progress_str}')
with open(self.filename, 'rb') as f:
for chunk in iter(lambda: f.read(self.chunk_size), b''):
self.size_read += len(chunk)
yield chunk
sys.stderr.write('\b' * len(progress_str))
percentage = self.size_read / self.file_size * 100
completed_str = f'{self.size_read / self.divisor:.2f}'
to_complete_str = f'{self.file_size / self.divisor:.2f} {self.unit}'
progress_str = f'{completed_str} / {to_complete_str} ({percentage:.2f} %)'
def __len__(self):
return self.file_size
# sample usage
requests.post(upload_url, data=ProgressUpload('file_path'))
方法。没有它,我会收到连接关闭错误。这是您不能只使用 tqdm + iter 来获得简单进度条的唯一原因。
from pathlib import Path
from tqdm import tqdm
import requests
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
def upload_file(upload_url, fields, filepath):
path = Path(filepath)
total_size = path.stat().st_size
filename = path.name
with tqdm(
) as bar:
with open(filepath, "rb") as f:
fields["file"] = ("filename", f)
e = MultipartEncoder(fields=fields)
m = MultipartEncoderMonitor(
e, lambda monitor: bar.update(monitor.bytes_read - bar.n)
headers = {"Content-Type": m.content_type}
requests.post(upload_url, data=m, headers=headers)
upload_url = 'https://uploadurl'
fields = {
"field1": value1,
"field2": value2
filepath = '97a6fce8_owners_2018_Van Zandt.csv'
upload_file(upload_url, fields, filepath)