我正在尝试使用此类通过 json-rpc 连接到 trytond 实例
class HttpClient:
"""HTTP Client to make JSON RPC requests to Tryton server.
User is logged in when an object is created.
"""
def __init__(self, url, database_name, user, passwd):
self._url = '{}/{}/'.format(url, database_name)
self._user = user
self._passwd = passwd
self._login()
self._id = 0
def get_id(self):
self._id += 1
return self._id
def _login(self):
"""
Returns list, where
0 - user id
1 - session token
"""
payload = json.dumps({
'params': [self._user, {'password': self._passwd}],
'jsonrpc': "2.0",
'method': 'common.db.login',
'id': 1,
})
headers = {'content-type': 'application/json'}
result = requests.post(self._url, data=payload, headers=headers)
if result.status_code in [500, 401]:
raise Exception(result.text)
if 'json' in result:
self._session = result.json()['result']
else:
self._session = json.loads(result.text)
return self._session
def call(self, prefix, method, params=[[], {}]):
"""RPC Call
"""
method = '{}.{}'.format(prefix, method)
payload = json.dumps({
'params': params,
'method': method,
'id': self.get_id(),
})
authorization = base64.b64encode(self._session[1].encode('utf-8'))
headers = {
'Content-Type': 'application/json',
'Authorization': b'Session ' + authorization
}
response = requests.post(self._url, data=payload, headers=headers)
if response.status_code in [500, 401]:
raise Exception(response.text)
return response.json()
def model(self, model, method, args=[], kwargs={}):
return self.call('model.%s' % model, method, [args, kwargs])
def system(self, method):
return self.call('system', method, params=[])
我这样称呼它
def notifieToMainServer(self):
url = "http://%s:%s" % (HOST, PORT)
headers = {'content-type': 'application/json'}
client = HttpClient(url, "tryton", CREDENTIALS[0], CREDENTIALS[1])
client.model('main.register',
'ActivateService',
{'code': self.code,
'name': self.nome,
'surname': self.cognome,
'service_type': 'registry',
'service_url': '' # qui andra messa l'url di chimata
}
)
HttpClient 的创建效果很好,我可以登录,但是当我尝试调用方法 ActivateService 时,我收到了 401 响应。
我还将 ActivateService 添加到 rpc 类中
@classmethod
def __setup__(cls):
super(MainRegister, cls).__setup__()
cls.__rpc__.update({
'ActivateService': RPC(instantiate=0)})
功能就像
def ActivateService(self,
code,
name,
surname,
service_type,
service_url):
"""
activate the service for the given code and the given service
"""
我做错了什么?
最好的问候, 马特奥