0

当我从http://twistedmatrix.com/documents/13.2.0/web/howto/client.html#auto4<HTML><HEAD><TITLE>401 Unauthorized</TITLE></HEAD>运行代码时出现错误,但我不知道如何在请求中添加身份验证。

更新

我将我的代码更新为:

from base64 import b64encode
authorization = b64encode(b"admin:admin")

d = agent.request(
    'GET',
    'http://172.19.1.76/',
    Headers(
        {
            'User-Agent': ['Twisted Web Client Example'],
            b"authorization": b"Basic " + authorization
        }        
    ),
    None)

但是出现以下错误,但我不知道应该提供列表中的哪些内容。

packages/twisted/web/http_headers.py", line 199, in setRawHeaders
    "instance of %r instead" % (name, type(values)))
TypeError: Header entry 'authorization' should be list but found instance of <type 'str'> instead

更新

请求的内容应该在一个列表中,如下所示:

"authorization": ["Basic " + authorization]
4

3 回答 3

4

您可以向使用发送的请求添加标头Agent(注意您链接到的示例的第 29 行)。

例如,要执行基本的身份验证/授权,您可以尝试以下操作:

from base64 import b64encode
authorization = b64encode(b"username:password")
getting = agent.request(..., Headers({b"authorization": [b"Basic " + authorization]}))
于 2014-05-27T12:00:41.180 回答
2

您可以使用treq's authentication support,如下所示:

d = treq.get(
    'http://...',
    auth=('username', 'password')
)
于 2014-05-27T18:51:54.267 回答
0

作为一个完整的例子:

from twisted.trial import unittest   
from urllib import urlencode
from base64 import b64encode
from twisted.python.log import err
from twisted.web.client import Agent, readBody
from twisted.internet import reactor
from twisted.internet.ssl import ClientContextFactory
from twisted.web.http_headers import Headers
from zope.interface import implements
from twisted.internet.defer import succeed
from twisted.web.iweb import IBodyProducer


class StringProducer(object):
    implements(IBodyProducer)

    def __init__(self, body):
        self.body = body
        self.length = len(body)

    def startProducing(self, consumer):
        consumer.write(self.body)
        return succeed(None)

    def pauseProducing(self):
        pass

    def stopProducing(self):
        pass

class WebClientContextFactory(ClientContextFactory):
    def getContext(self, hostname, port):
        return ClientContextFactory.getContext(self)

class HttpsClientTestCases(unittest.TestCase):
    def test_https_client(self):
        def cbRequest(response):
            print 'Response version:', response.version
            print 'Response code:', response.code
            print 'Response phrase:', response.phrase
            print 'Response headers:[{}]'.format(list(response.headers.getAllRawHeaders()))
            d = readBody(response)
            d.addCallback(cbBody)
            return d

        def cbBody(body):
            print 'Response body:'
            print body

        contextFactory = WebClientContextFactory()
        agent = Agent(reactor, contextFactory)

        authorization = b64encode(b"username:password")
        data = StringProducer({'hello': 'world'})
        d = agent.request(
            'POST',
            'https://....',
            headers = Headers(
                {
                    'Content-Type': ['application/x-www-form-urlencoded'],
                    'Authorization': ['Basic ' + authorization]
                }
            ),
            bodyProducer = data
        )

        d.addCallbacks(cbRequest, err)
        d.addCallback(lambda ignored: reactor.stop())
        return d
于 2015-09-25T16:06:20.077 回答