2

我想使用 Python 代码访问 AWS AppSync API 并与请求库混淆。

身份验证模式是 Cognito 用户池。我的问题是:

  1. 如何从 Cognito 用户池获取访问令牌?

  2. 如何进行查询、突变和处理订阅?

我尝试使用身份验证模式 API 密钥来做到这一点。但我收到以下错误。

import requests
import json

URL = "https://vtcarmq7zzeadnkwzcgfr24irm.appsync-api.us-east-1.amazonaws.com/graphql"

headers = {"x-api-key":"da2-bwuyzqchhfgyxemcmdinjegb7e"}

data = json.dumps({
    "query": '''
    
  listTodos(filter:{
    title:{
      contains:"g"
    }     
  }   )     {
    items{
      id title duedate     
    }   
  }
'''

} )

r = requests.request("POST", URL , data = data , headers = headers)

print(r.text)

{ "errors" : [ { "message" : "无法解析 GraphQL 查询。", "errorType" : "MalformedHttpRequestException" } ] }

我看过这个视频https://www.youtube.com/watch?v=2U4RsbFO4bA&t=1172s 在这个视频中,为了使用 cognito 用户池进行身份验证,他说要调用 cognito 用户池并获取令牌并通过它到标头中的 aws appsync。

我是 aws 和 python 请求模块的新手,正在尝试为此视频编写 python 代码。

4

1 回答 1

0

graphql-python/gql自版本 3.0.0rc0起支持 AWS AppSync 。

它支持实时端点上的查询、变异甚至订阅。

它支持 IAM、api 密钥和 JWT 身份验证方法。

文档可在此处获得

以下是使用 API 密钥身份验证的突变示例:

import asyncio
import os
import sys
from urllib.parse import urlparse

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from gql.transport.appsync_auth import AppSyncApiKeyAuthentication

# Uncomment the following lines to enable debug output
# import logging
# logging.basicConfig(level=logging.DEBUG)


async def main():

    # Should look like:
    # https://XXXXXXXXXXXXXXXXXXXXXXXXXX.appsync-api.REGION.amazonaws.com/graphql
    url = os.environ.get("AWS_GRAPHQL_API_ENDPOINT")
    api_key = os.environ.get("AWS_GRAPHQL_API_KEY")

    if url is None or api_key is None:
        print("Missing environment variables")
        sys.exit()

    # Extract host from url
    host = str(urlparse(url).netloc)

    auth = AppSyncApiKeyAuthentication(host=host, api_key=api_key)

    transport = AIOHTTPTransport(url=url, auth=auth)

    async with Client(
        transport=transport, fetch_schema_from_transport=False,
    ) as session:

        query = gql(
            """
mutation createMessage($message: String!) {
  createMessage(input: {message: $message}) {
    id
    message
    createdAt
  }
}"""
        )

        variable_values = {"message": "Hello world!"}

        result = await session.execute(query, variable_values=variable_values)
        print(result)


asyncio.run(main())
于 2021-12-10T09:43:33.287 回答