24

试图从requests.

有什么好方法可以做到这一点?

4

7 回答 7

29

requests故意包装这样的低级东西。通常,您唯一要做的就是验证证书是否有效。为此,只需通过verify=True. 如果你想使用非标准的 cacert 包,你也可以通过它。例如:

resp = requests.get('https://example.com', verify=True, cert=['/path/to/my/ca.crt'])

此外,requests主要是一组围绕其他库的包装器,主要是标准库(或者,对于 2.x,urllib3)和.http.clienthttplibssl

有时,答案只是获取较低级别的对象(例如,resp.rawurllib3.response.HTTPResponse),但在许多情况下这是不可能的。

这就是其中一种情况。唯一能看到证书的对象是 an http.client.HTTPSConnection(或 a urllib3.connectionpool.VerifiedHTTPSConnection,但这只是前者的子类)和 an ssl.SSLSocket,并且在请求返回时它们都不存在了。(connectionpool顾名思义,HTTPSConnection对象存储在池中,一旦完成就可以重用;SSLSocketHTTPSConnection.)

所以,你需要修补一些东西,这样你就可以将数据复制到链上。它可能就像这样简单:

HTTPResponse = requests.packages.urllib3.response.HTTPResponse
orig_HTTPResponse__init__ = HTTPResponse.__init__
def new_HTTPResponse__init__(self, *args, **kwargs):
    orig_HTTPResponse__init__(self, *args, **kwargs)
    try:
        self.peercert = self._connection.sock.getpeercert()
    except AttributeError:
        pass
HTTPResponse.__init__ = new_HTTPResponse__init__

HTTPAdapter = requests.adapters.HTTPAdapter
orig_HTTPAdapter_build_response = HTTPAdapter.build_response
def new_HTTPAdapter_build_response(self, request, resp):
    response = orig_HTTPAdapter_build_response(self, request, resp)
    try:
        response.peercert = resp.peercert
    except AttributeError:
        pass
    return response
HTTPAdapter.build_response = new_HTTPAdapter_build_response

这是未经测试的,所以不能保证;你可能需要修补更多。

此外,子类化和覆盖可能比猴子补丁更干净(特别是因为HTTPAdapter它被设计为子类化)。

或者,更好的是,分叉urllib3requests修改你的分叉,并且(如果你认为这是合法有用的)向上游提交拉取请求。

无论如何,现在,从您的代码中,您可以执行以下操作:

resp.peercert

这将为您提供一个带有'subject''subjectAltName'键的字典,由pyopenssl.WrappedSocket.getpeercert. 如果您想了解有关证书的更多信息,请尝试Christophe Vandeplas 的这个答案的变体,它可以让您获得一个OpenSSL.crypto.X509对象。如果您想获得整个对等证书链,请参阅GoldenStake 的答案

当然,您可能还想传递验证证书所需的所有信息,但这更容易,因为它已经通过了顶层。

于 2013-06-03T19:56:38.933 回答
12

首先,阿巴纳特的回答非常完整。在追查Kalkran的提议connection-close问题时,我实际上发现它没有包含有关 SSL 证书的详细信息。peercert

我深入挖掘了连接和套接字信息,并提取了self.sock.connection.get_peer_certificate()包含以下功能的函数:

  • get_subject()用于 CN
  • get_notAfter()get_notBefore()到期日期
  • get_serial_number()以及get_signature_algorithm()加密相关的技术细节
  • ...

请注意,这些仅在您已pyopenssl在系统上安装时可用。在引擎盖下,如果可用,则使用标准库的urllib3模块,否则使用。下面显示的属性仅在是 a时存在,而不是在它是 a 时存在。您可以使用.pyopensslsslself.sock.connectionself.sockurllib3.contrib.pyopenssl.WrappedSocketssl.SSLSocketpyopensslpip install pyopenssl

完成后,代码变为:

import requests

HTTPResponse = requests.packages.urllib3.response.HTTPResponse
orig_HTTPResponse__init__ = HTTPResponse.__init__
def new_HTTPResponse__init__(self, *args, **kwargs):
    orig_HTTPResponse__init__(self, *args, **kwargs)
    try:
        self.peer_certificate = self._connection.peer_certificate
    except AttributeError:
        pass
HTTPResponse.__init__ = new_HTTPResponse__init__

HTTPAdapter = requests.adapters.HTTPAdapter
orig_HTTPAdapter_build_response = HTTPAdapter.build_response
def new_HTTPAdapter_build_response(self, request, resp):
    response = orig_HTTPAdapter_build_response(self, request, resp)
    try:
        response.peer_certificate = resp.peer_certificate
    except AttributeError:
        pass
    return response
HTTPAdapter.build_response = new_HTTPAdapter_build_response

HTTPSConnection = requests.packages.urllib3.connection.HTTPSConnection
orig_HTTPSConnection_connect = HTTPSConnection.connect
def new_HTTPSConnection_connect(self):
    orig_HTTPSConnection_connect(self)
    try:
        self.peer_certificate = self.sock.connection.get_peer_certificate()
    except AttributeError:
        pass
HTTPSConnection.connect = new_HTTPSConnection_connect

您将能够轻松访问结果:

r = requests.get('https://yourdomain.tld', timeout=0.1)
print('Expires on: {}'.format(r.peer_certificate.get_notAfter()))
print(dir(r.peer_certificate))

如果像我一样,您想忽略 SSL 证书警告,只需在文件顶部添加以下内容并且不进行 SSL 验证:

from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

r = requests.get('https://yourdomain.tld', timeout=0.1, verify=False)
print(dir(r.peer_certificate))
于 2018-08-29T07:46:24.133 回答
6

感谢大家的精彩回答。

它帮助我过度设计了这个问题的答案:

如何将自定义 CA Root 证书添加到 Windows 中 Python 使用的 CA Store?

更新 2019-02-12

请查看Cert Human: SSL Certificates for Humans以对我的https://github.com/neozenith/get-ca-py项目由lifehackjim进行令人印象深刻的重写。

我现在已经存档了原始存储库。

独立片段

#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
Get Certificates from a request and dump them.
"""

import argparse
import sys

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

"""
Inspired by the answers from this Stackoverflow question:
https://stackoverflow.com/questions/16903528/how-to-get-response-ssl-certificate-from-requests-in-python

What follows is a series of patching the low level libraries in requests.
"""

"""
https://stackoverflow.com/a/47931103/622276
"""

sock_requests = requests.packages.urllib3.contrib.pyopenssl.WrappedSocket


def new_getpeercertchain(self, *args, **kwargs):
    x509 = self.connection.get_peer_cert_chain()
    return x509


sock_requests.getpeercertchain = new_getpeercertchain

"""
https://stackoverflow.com/a/16904808/622276
"""

HTTPResponse = requests.packages.urllib3.response.HTTPResponse
orig_HTTPResponse__init__ = HTTPResponse.__init__


def new_HTTPResponse__init__(self, *args, **kwargs):
    orig_HTTPResponse__init__(self, *args, **kwargs)
    try:
        self.peercertchain = self._connection.sock.getpeercertchain()
    except AttributeError:
        pass


HTTPResponse.__init__ = new_HTTPResponse__init__

HTTPAdapter = requests.adapters.HTTPAdapter
orig_HTTPAdapter_build_response = HTTPAdapter.build_response


def new_HTTPAdapter_build_response(self, request, resp):
    response = orig_HTTPAdapter_build_response(self, request, resp)
    try:
        response.peercertchain = resp.peercertchain
    except AttributeError:
        pass
    return response


HTTPAdapter.build_response = new_HTTPAdapter_build_response

"""
Attempt to wrap in a somewhat usable CLI
"""


def cli(args):
    parser = argparse.ArgumentParser(description="Request any URL and dump the certificate chain")
    parser.add_argument("url", metavar="URL", type=str, nargs=1, help="Valid https URL to be handled by requests")

    verify_parser = parser.add_mutually_exclusive_group(required=False)
    verify_parser.add_argument("--verify", dest="verify", action="store_true", help="Explicitly set SSL verification")
    verify_parser.add_argument(
        "--no-verify", dest="verify", action="store_false", help="Explicitly disable SSL verification"
    )
    parser.set_defaults(verify=True)

    return vars(parser.parse_args(args))


def dump_pem(cert, outfile="ca-chain.crt"):
    """Use the CN to dump certificate to PEM format"""
    PyOpenSSL = requests.packages.urllib3.contrib.pyopenssl
    pem_data = PyOpenSSL.OpenSSL.crypto.dump_certificate(PyOpenSSL.OpenSSL.crypto.FILETYPE_PEM, cert)
    issuer = cert.get_issuer().get_components()

    print(pem_data.decode("utf-8"))

    with open(outfile, "a") as output:
        for part in issuer:
            output.write(part[0].decode("utf-8"))
            output.write("=")
            output.write(part[1].decode("utf-8"))
            output.write(",\t")
        output.write("\n")
        output.write(pem_data.decode("utf-8"))


if __name__ == "__main__":
    cli_args = cli(sys.argv[1:])

    url = cli_args["url"][0]
    req = requests.get(url, verify=cli_args["verify"])
    for cert in req.peercertchain:
        dump_pem(cert)
于 2018-10-24T05:28:16.110 回答
3

这虽然一点也不漂亮,但有效:

import requests

req = requests.get('https://httpbin.org')
pool = req.connection.poolmanager.connection_from_url('https://httpbin.org')
conn = pool.pool.get()
# get() removes it from the pool, so put it back in
pool.pool.put(conn)
print(conn.sock.getpeercert())
于 2013-06-03T19:44:14.527 回答
2

首先,阿巴纳特的回答很完整

但我想补充一点,如果您正在寻找对等证书链,则需要修补另一段代码

import requests
sock_requests = requests.packages.urllib3.contrib.pyopenssl.WrappedSocket
def new_getpeercertchain(self,*args, **kwargs):
    x509 = self.connection.get_peer_cert_chain()
    return x509
sock_requests.getpeercertchain = new_getpeercertchain

之后,您可以以与接受的答案非常相似的方式调用它

HTTPResponse = requests.packages.urllib3.response.HTTPResponse
orig_HTTPResponse__init__ = HTTPResponse.__init__
def new_HTTPResponse__init__(self, *args, **kwargs):
    orig_HTTPResponse__init__(self, *args, **kwargs)
    try:
        self.peercertchain = self._connection.sock.getpeercertchain()
    except AttributeError:
        pass
HTTPResponse.__init__ = new_HTTPResponse__init__

HTTPAdapter = requests.adapters.HTTPAdapter
orig_HTTPAdapter_build_response = HTTPAdapter.build_response
def new_HTTPAdapter_build_response(self, request, resp):
    response = orig_HTTPAdapter_build_response(self, request, resp)
    try:
        response.peercertchain = resp.peercertchain
    except AttributeError:
        pass
    return response
HTTPAdapter.build_response = new_HTTPAdapter_build_response

你会得到resp.peercertchain其中包含tuple一个OpenSSL.crypto.X509对象

于 2017-12-21T18:55:25.463 回答
0

为了检索证书的详细信息,例如 CN 和到期日期,改编自此示例的以下脚本运行良好。它还避免了我得到的一些错误,我认为这些错误是由于请求和 urllib3 的不正确/不兼容版本:“AttributeError:'SSLSocket'对象没有属性'connection'”和“AttributeError:'VerifiedHTTPSConnection'对象没有属性'peer_certificate' "

from OpenSSL.SSL import Connection, Context, SSLv3_METHOD, TLSv1_2_METHOD
from datetime import datetime, time
import socket
host = 'www.google.com'
try:
    try:
        ssl_connection_setting = Context(SSLv3_METHOD)
    except ValueError:
        ssl_connection_setting = Context(TLSv1_2_METHOD)
    ssl_connection_setting.set_timeout(5)
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((host, 443))
        c = Connection(ssl_connection_setting, s)
        c.set_tlsext_host_name(str.encode(host))
        c.set_connect_state()
        c.do_handshake()
        cert = c.get_peer_certificate()
        print("Is Expired: ", cert.has_expired())
        print("Issuer: ", cert.get_issuer())
        subject_list = cert.get_subject().get_components()
        cert_byte_arr_decoded = {}
        for item in subject_list:
            cert_byte_arr_decoded.update({item[0].decode('utf-8'): item[1].decode('utf-8')})
        print(cert_byte_arr_decoded)
        if len(cert_byte_arr_decoded) > 0:
            print("Subject: ", cert_byte_arr_decoded)
        if cert_byte_arr_decoded["CN"]:
            print("Common Name: ", cert_byte_arr_decoded["CN"])
        end_date = datetime.strptime(str(cert.get_notAfter().decode('utf-8')), "%Y%m%d%H%M%SZ")
        print("Not After (UTC Time): ", end_date)
        diff = end_date - datetime.now()
        print('Summary: "{}" SSL certificate expires on {} i.e. {} days.'.format(host, end_date, diff.days))
        c.shutdown()
        s.close()
except:
    print("Connection to {} failed.".format(host))  

此脚本需要 Python 3 和 pyOpenSSL。

于 2020-07-28T16:30:07.790 回答
0

更清洁(-ish)的解决方案,基于以前非常好的答案!

  1. 需要在覆盖 HTTPResponse 类之前修补 requests.Adapter 源文件(挂起的拉取请求:https ://github.com/psf/requests/pull/6039 ):
    • 将静态类变量添加到类 HTTPAdapter(BaseAdapter)_clsHTTPResponse = HTTPResponse
    • 修改send()方法以使用 _clsHTTPResponse 而不是直接创建 HTTPResponse 对象: resp = _clsHTTPResponse.from_httplib(...
  2. 使用此代码:
"""
Subclassing HTTP / requests to get peer_certificate back from lower levels
"""
from typing import Optional, Mapping, Any
from http.client import HTTPSConnection
from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
from urllib3.poolmanager import PoolManager,key_fn_by_scheme
from urllib3.connectionpool import HTTPSConnectionPool,HTTPConnectionPool
from urllib3.connection import HTTPSConnection,HTTPConnection
from urllib3.response import HTTPResponse as URLLIB3_HTTPResponse

#force urllib3 to use pyopenssl
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()  

class HTTPSConnection_withcert(HTTPSConnection):
    def __init__(self, *args, **kw):
        self.peer_certificate = None
        super().__init__(*args, **kw)
    def connect(self):
        res = super().connect() 
        self.peer_certificate = self.sock.connection.get_peer_certificate()
        return res

class HTTPResponse_withcert(URLLIB3_HTTPResponse):
    def __init__(self, *args, **kwargs):
        self.peer_certificate = None
        res = super().__init__( *args, **kwargs)
        self.peer_certificate = self._connection.peer_certificate
        return res
       
class HTTPSConnectionPool_withcert(HTTPSConnectionPool):
    ConnectionCls   = HTTPSConnection_withcert
    ResponseCls     = HTTPResponse_withcert
    
class PoolManager_withcert(PoolManager): 
    def __init__(
        self,
        num_pools: int = 10,
        headers: Optional[Mapping[str, str]] = None,
        **connection_pool_kw: Any,
    ) -> None:   
        super().__init__(num_pools,headers,**connection_pool_kw)
        self.pool_classes_by_scheme = {"http": HTTPConnectionPool, "https": HTTPSConnectionPool_withcert}
        self.key_fn_by_scheme = key_fn_by_scheme.copy()
                
class HTTPAdapter_withcert(HTTPAdapter):
    _clsHTTPResponse = HTTPResponse_withcert
    def build_response(self, request, resp):
        response = super().build_response( request, resp)
        response.peer_certificate = resp.peer_certificate
        return response

    def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs):
        #do not call super() to not initialize PoolManager twice
        # save these values for pickling
        self._pool_connections  = connections
        self._pool_maxsize      = maxsize
        self._pool_block        = block

        self.poolmanager        = PoolManager_withcert(num_pools=connections, 
                                                   maxsize=maxsize,
                                                   block=block, 
                                                   strict=True, 
                                                   **pool_kwargs)
class Session_withcert(Session):
    def __init__(self):
        super().__init__()
        self.mount('https://', HTTPAdapter_withcert())
  1. 就这样 !您现在可以像基础会话一样使用新会话 Session_withcert(),但您也可以这样做:
ss= Session_withcert()
resp=ss.get("https://www.google.fr")
resp.peer_certificate.get_subject()
print(resp.peer_certificate.get_subject())

这将输出:

<X509Name object '/CN=*.google.fr'>
于 2022-01-09T14:15:20.490 回答