0

我正在尝试使用此类通过 json-rpc 连接到 trytond 实例

class HttpClient:
    """HTTP Client to make JSON RPC requests to Tryton server.
    User is logged in when an object is created.
    """

    def __init__(self, url, database_name, user, passwd):
        self._url = '{}/{}/'.format(url, database_name)
        self._user = user
        self._passwd = passwd
        self._login()
        self._id = 0

    def get_id(self):
        self._id += 1
        return self._id

    def _login(self):
        """
        Returns list, where
        0 - user id
        1 - session token
        """
        payload = json.dumps({
            'params': [self._user, {'password': self._passwd}],
            'jsonrpc': "2.0",
            'method': 'common.db.login',
            'id': 1,
        })
        headers = {'content-type': 'application/json'}
        result = requests.post(self._url, data=payload, headers=headers)
        if result.status_code in [500, 401]:
            raise Exception(result.text)

        if 'json' in result:
            self._session = result.json()['result']
        else:
            self._session = json.loads(result.text)

        return self._session

    def call(self, prefix, method, params=[[], {}]):
        """RPC Call
        """
        method = '{}.{}'.format(prefix, method)
        payload = json.dumps({
            'params': params,
            'method': method,
            'id': self.get_id(),

        })

        authorization = base64.b64encode(self._session[1].encode('utf-8'))
        headers = {
        'Content-Type': 'application/json',
        'Authorization': b'Session ' + authorization
        }

        response = requests.post(self._url, data=payload, headers=headers)
        if response.status_code in [500, 401]:
            raise Exception(response.text)
        return response.json()

    def model(self, model, method, args=[], kwargs={}):
        return self.call('model.%s' % model, method, [args, kwargs])

    def system(self, method):
        return self.call('system', method, params=[])

我这样称呼它

def notifieToMainServer(self):
    url = "http://%s:%s" % (HOST, PORT)
    headers = {'content-type': 'application/json'}
    client = HttpClient(url, "tryton", CREDENTIALS[0], CREDENTIALS[1])   
    client.model('main.register',
                 'ActivateService', 
                 {'code': self.code,
                  'name': self.nome,
                  'surname': self.cognome,
                  'service_type': 'registry',
                  'service_url': '' # qui andra messa l'url di chimata
                  }
                 )

HttpClient 的创建效果很好,我可以登录,但是当我尝试调用方法 ActivateService 时,我收到了 401 响应。

我还将 ActivateService 添加到 rpc 类中

@classmethod
def __setup__(cls):
    super(MainRegister, cls).__setup__()
    cls.__rpc__.update({    
            'ActivateService': RPC(instantiate=0)})  

功能就像

def ActivateService(self,
                    code,
                    name,
                    surname,
                    service_type,
                    service_url):
    """
        activate the service for the given code and the given service
    """

我做错了什么?

最好的问候, 马特奥

4

2 回答 2

1

授权标头应为:

'Authorization': : b'Bearer' + 授权

于 2020-02-18T22:45:14.190 回答
0

在和我终于解决了这个修改类的问题

class HttpClient:
    """HTTP Client to make JSON RPC requests to Tryton server.
    User is logged in when an object is created.
    """

    def __init__(self, url, database_name, user, passwd):
        self._url = '{}/{}/'.format(url, database_name)
        self._user = user
        self._passwd = passwd
        self._login()
        self._id = 0

    def get_id(self):
        self._id += 1
        return self._id

    def _login(self):
        """
        Returns list, where
        0 - user id
        1 - session token
        """
        payload = json.dumps({
            'params': [self._user, {'password': self._passwd}],
            'jsonrpc': "2.0",
            'method': 'common.db.login',
            'id': 1,
        })
        headers = {'content-type': 'application/json'}
        result = requests.post(self._url,
                               data=payload,
                               headers=headers)
        if result.status_code in [500, 401]:
            raise Exception(result.text)

        if 'json' in result:
            self._session = result.json()['result']
        else:
            self._session = json.loads(result.text)

        return self._session

    def call(self, prefix, method, params=[[], {}, {}]):
        """RPC Call
        """
        method = '{}.{}'.format(prefix, method)
        payload = json.dumps({
            'params': params,
            'method': method,
            'id': self.get_id(),
         })
        auth = "%s:%s:%s" % (self._user, self._session[0], self._session[1])
        authorization = base64.b64encode(auth.encode('utf-8'))
        headers = {
        'Content-Type': 'application/json',
        'Authorization': b'Session ' + authorization
        }

        response = requests.post(self._url,
                                 data=payload,
                                 headers=headers)
        if response.status_code in [500, 401]:
            raise Exception(response.text)
        return response.json()

    def model(self, model, method, args=[], kwargs={}):
        return self.call('model.%s' % model, method, (args, kwargs, {}))

    def system(self, method):
        return self.call('system', method, params=[])

关键是必须在上下文中完成的调用才能以正确的方式执行!

    def model(self, model, method, args=[], kwargs={}):
        return self.call('model.%s' % model, method, (args, kwargs, {}))

希望它有助于问候

于 2020-02-20T08:12:26.493 回答