0

我们有一个基于 S3 数据构建的 Redshift Spectrum 表——我们正在尝试在该表中自动添加分区——我可以在 redshift 客户端或 psql shell 中运行以下 ALTER 语句:

ALTER TABLE analytics_spectrum.page_view ADD PARTITION(date='2017-10-17') LOCATION 's3://data-hub/page_view/2017/10/17/';

但这无法通过 psycopg2 执行。

sql_query = "ALTER TABLE analytics_spectrum.page_view ADD PARTITION(date='2017-10-17') LOCATION 's3://data-hub/_page_view_v3/2017/10/17/';"
import config
import psycopg2
connection = psycopg2.connect(
            **config.DATABASES['redshift_db']["connection"])
cursor = connection.cursor()
cursor.execute(sql_query)

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
psycopg2.ProgrammingError: syntax error at or near "("
LINE 1: ...ABLE analytics_spectrum.page_view ADD PARTITION(date='201...

在 psycopg2 的情况下,它甚至不会将查询发送到 redshift,并且在查询解析中执行失败。

现在我已经实现了使用 subprocess.popen 来执行 alter 语句 - 但我想将它切换回使用 psycopg2。

p = subprocess.Popen(['psql',
                      '-h', self.spectrum_connection['host'],
                      '-p', self.spectrum_connection['port'],
                      '-d', self.spectrum_connection['dbname'],
                      '-U', self.spectrum_connection['user'],
                      '-c', sql_stmt],
                     env={
    'PGPASSWORD': self.spectrum_connection['password']},
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)
out, err = p.communicate()

建议/想法?

谢谢,侯赛因·博拉

4

2 回答 2

3

我遇到过同样的问题。不使用 ISOLATION_LEVEL_AUTOCOMMIT 的查询执行会引发以下错误:

psycopg2.InternalError: ALTER EXTERNAL TABLE cannot run inside a transaction block

我稍微修改了我的代码并且它起作用了。

import argparse
import sys, psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

input_data = {}
input_data["db_name"] = <<DB_NAME>>
input_data["db_host"] = <<HOST_NAME>>
input_data["db_port"] = 5439
input_data["db_user"] = <<USER>>
input_data["db_pass"] = <<PASSWORD>>
con = psycopg2.connect(dbname=input_data["db_name"], host=input_data["db_host"], port=input_data["db_port"], user=input_data["db_user"], password=input_data["db_pass"])
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
query = <<ADD_YOUR_QUERY_HERE>>
cur.execute(query)
cur.close() 
con.close()
于 2017-11-22T12:56:05.513 回答
0

在您的查询中,您必须首先添加set autocommit=on;以传递事务块。

然后魔法就会发生,你可以对你的表进行分区。

于 2019-07-05T08:26:43.393 回答