0

我正在尝试构建一个需要对 Apache Pulsar 服务进行管理操作的工具。出于某种原因,他们决定不对 REST API 主体中的参数使用纯 JSON,而是似乎使用 Jackson JSON 序列化。我开发的工具是用 Python 3.6 编写的,我正在寻找将简单数据结构编码为 Jackson JSON 序列化格式的方法,甚至寻找序列化格式的规范。是否存在此类文档或 Python 代码?由于需要序列化的典型数据结构很简单,例如 a Set<AuthActions>,并且AuthActions是一个枚举,如果已知的话,将这些东西直接手动编码成序列化格式是可行的。

用代码示例编辑:

import asyncio
import aiohttp
import ssl
import os

async def go(loop):
    current_dir = os.path.abspath(os.path.dirname(__file__))
    sslcontext = ssl.create_default_context(cafile=os.path.join(current_dir, 'cacert.pem'))
    sslcontext.load_cert_chain(os.path.join(current_dir, 'super-cert.pem'),
                               os.path.join(current_dir, 'super-key.pem'))
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.post('https://localhost:8081/admin/namespaces/sample/standalone/ns1/permissions/testrole',
                                 json={'actions': [0, 1]}, ssl=sslcontext) as resp:
             print(resp.status)
             print(await resp.text())
        async with session.get('https://localhost:8081/admin/persistent/sample/standalone/ns1', ssl=sslcontext) as resp:
            print(resp.status)
            print(await resp.text())

loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()
4

1 回答 1

2

事实证明,这只是 Apache Pulsar 管理界面中缺少文档。这是一个工作示例:

import asyncio
import aiohttp
import ssl
import os

async def go(loop):
    current_dir = os.path.abspath(os.path.dirname(__file__))
    sslcontext = ssl.create_default_context(cafile=os.path.join(current_dir, 'cacert.pem'))
    sslcontext.load_cert_chain(os.path.join(current_dir, 'super-cert.pem'),
                               os.path.join(current_dir, 'super-key.pem'))
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.post('https://localhost:8081/admin/namespaces/sample/standalone/ns1/permissions/testrole',
                                 json=[0, 1], ssl=sslcontext) as resp:
             print(resp.status)
             print(await resp.text())
        async with session.get('https://localhost:8081/admin/persistent/sample/standalone/ns1', ssl=sslcontext) as resp:
            print(resp.status)
            print(await resp.text())

loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()
于 2018-04-25T07:18:52.163 回答