0

我为不同的团队开发了不同的 Athena 工作组,以便我可以将他们的查询和他们的查询结果分开。用户希望从他们的笔记本实例 (JupyterLab) 中查询他们可用的表。我很难找到成功满足从用户特定工作组查询表的要求的代码。我只找到了将从主工作组查询表的代码。

我目前使用的代码添加在下面。

from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
region_name='<YOUR REGION, for example, us-west-2>')


df = pd.read_sql("SELECT * FROM <DATABASE-NAME>.<YOUR TABLE NAME> limit 8;", conn)
df

此代码不起作用,因为用户只能从其特定工作组执行查询,因此在运行此代码时会出错。它也不涵盖在用户特定工作组中分离用户查询的要求。

关于如何添加更改代码以便我可以从笔记本实例在特定工作组中运行查询的任何建议?

4

1 回答 1

2

的文档pyathena不是非常广泛,但在查看源代码后我们可以看到它connect只是创建了类的实例Connection

def connect(*args, **kwargs):
    from pyathena.connection import Connection
    return Connection(*args, **kwargs)

现在,在查看GitHub 上Connection.__init__的签名后,我们可以看到参数名称与官方AWS Python API中的参数之一相同。以下是他们的文档所说的:work_group=Nonestart_query_executionboto3

WorkGroup (string) -- 正在其中启动查询的工作组的名称。

在完成使用和导入之后,Connection我们最终得到了BaseCursor类,该类在start_query_execution使用由方法组装的参数解包字典时在后台进行调用BaseCursor._build_start_query_execution_request。这就是我们可以看到向 ​​AWS Athena 提交查询的熟悉语法的地方,特别是以下部分:

if self._work_group or work_group:
    request.update({
        'WorkGroup': work_group if work_group else self._work_group
    })

所以这应该对你的情况有用:

import pandas as pd
from pyathena import connect


conn = connect(
    s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
    region_name='<YOUR REGION, for example, us-west-2>',
    work_group='<USER SPECIFIC WORKGROUP>'
)

df = pd.read_sql("SELECT * FROM <DATABASE-NAME>.<YOUR TABLE NAME> limit 8;", conn)
于 2019-08-16T13:56:13.253 回答