作为一个完整的例子:
from twisted.trial import unittest
from urllib import urlencode
from base64 import b64encode
from twisted.python.log import err
from twisted.web.client import Agent, readBody
from twisted.internet import reactor
from twisted.internet.ssl import ClientContextFactory
from twisted.web.http_headers import Headers
from zope.interface import implements
from twisted.internet.defer import succeed
from twisted.web.iweb import IBodyProducer
class StringProducer(object):
implements(IBodyProducer)
def __init__(self, body):
self.body = body
self.length = len(body)
def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
class WebClientContextFactory(ClientContextFactory):
def getContext(self, hostname, port):
return ClientContextFactory.getContext(self)
class HttpsClientTestCases(unittest.TestCase):
def test_https_client(self):
def cbRequest(response):
print 'Response version:', response.version
print 'Response code:', response.code
print 'Response phrase:', response.phrase
print 'Response headers:[{}]'.format(list(response.headers.getAllRawHeaders()))
d = readBody(response)
d.addCallback(cbBody)
return d
def cbBody(body):
print 'Response body:'
print body
contextFactory = WebClientContextFactory()
agent = Agent(reactor, contextFactory)
authorization = b64encode(b"username:password")
data = StringProducer({'hello': 'world'})
d = agent.request(
'POST',
'https://....',
headers = Headers(
{
'Content-Type': ['application/x-www-form-urlencoded'],
'Authorization': ['Basic ' + authorization]
}
),
bodyProducer = data
)
d.addCallbacks(cbRequest, err)
d.addCallback(lambda ignored: reactor.stop())
return d