0

我有一个如下的 dask 表达式,我试图以分布式方式运行 sqlalchemy 查询。connect_args但是,它引用了在参数中输入的 .pem 密钥文件。如何将此密钥文件上传到 dask 集群/工作人员,以便它允许我运行此 sqlalchemy 查询?

def execute_query(q):
    conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
                               connect_args={'protocol': 'https',
                                             'requests_kwargs': {'verify': key}})
    return pd.read_sql(q, conn)

df = dd.from_delayed([
    delayed(execute_query)(q) for q in queries])

我尝试使用client.upload_file将本地文件发送到集群,但它抱怨它无法找到 .pem 密钥的路径

OSError: Could not find a suitable TLS CA certificate bundle, invalid path: hdsj1ptc001.pem
4

1 回答 1

1

虽然 Dask 可以为您处理一些文件操作(请参阅 参考资料client.upload_file),但您应该使用自己的方法将敏感文件(例如凭证)分发到工作文件系统上的特定位置。选项包括scp、kubernetes secrets 和许多其他方法。

如果您确定集群的安全性,您可以在函数的参数中包含密钥文件,然后将其写入函数中的文件(见下文),或者,如果调用允许,直接传递字节。

def execute_query(q, key):
    if not os.path.exists(keyfile):   # if the data needs to be in a file
        open(keyfile, 'wb').write(key)
    conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
                               connect_args={'protocol': 'https',
                                             'requests_kwargs': {'verify': keyfile}})
    return pd.read_sql(q, conn)

key = dask.delayed(open('keyfile.pem', 'rb').read())
df = dd.from_delayed([
    delayed(execute_query)(q, key) for q in queries])
于 2020-04-27T15:14:32.017 回答