我编写了一个 pythonc 脚本,它使用boto3
python 库来查询 Aurora Serverless (PostgreSQL) 数据库。我正在使用 DATA API 批量插入(我在多个批次中这样做)一个非常大的CSV
文件,其中包含超过 600 万条记录到数据库中。每条记录包含 37 列。当我在我的 PC 上运行脚本时(我已经设置了 AWS-CLI 凭证,以便我使用授权的用户与云中的 Aurora DB 对话)我能够成功插入几批包含 1800 个 SQL 插入语句的批次,然后我得到那个错误:
An error occurred (413) when calling the BatchExecuteStatement operation
Python脚本:
import boto3
import csv
rds_client = boto3.client('rds-data')
database_name = "postgres"
db_cluster_arn = "arn:aws:rds:us-east-1:xxxxxxxx:cluster:database-1"
db_credentials_secrets_store_arn = "arn:aws:secretsmanager:us-east-1:xxxxxxxxxx:secret:rds-db-credentials/cluster-VPF7JUKVRLQMEHF4QV2HSIKELM/postgres-QVIzWC"
def batch_execute_statement(sql, sql_parameter_sets, transaction_id=None):
parameters = {
'secretArn': db_credentials_secrets_store_arn,
'database': database_name,
'resourceArn': db_cluster_arn,
'sql': sql,
'parameterSets': sql_parameter_sets
}
if transaction_id is not None:
parameters['transactionId'] = transaction_id
response = rds_client.batch_execute_statement(**parameters)
return response
def get_entry(row):
entry = [
{'name': 'RIG_ID', 'value': {'stringValue': row['RIG_ID']}},
# DEPTH_CAPACITY
if row['DEPTH_CAPACITY'] == '':
entry.append({'name': 'DEPTH_CAPACITY',
'value': {'isNull': True}})
else:
entry.append({'name': 'DEPTH_CAPACITY', 'typeHint': 'DECIMAL', 'value': {
'stringValue': row['DEPTH_CAPACITY']}})
# MANY MORE ENTRIES HERE
return entry
def execute_transaction(sql, parameter_set):
transaction = rds_client.begin_transaction(secretArn=db_credentials_secrets_store_arn,resourceArn=db_cluster_arn,database=database_name)
try:
response = batch_execute_statement(sql, parameter_set, transaction['transactionId'])
except Exception as e:
transaction_response = rds_client.rollback_transaction(
secretArn=db_credentials_secrets_store_arn,
resourceArn=db_cluster_arn,
transactionId=transaction['transactionId'])
else:
transaction_response = rds_client.commit_transaction(
secretArn=db_credentials_secrets_store_arn,
resourceArn=db_cluster_arn,
transactionId=transaction['transactionId'])
print(f'Number of records updated: {len(response["updateResults"])}')
print(f'Transaction Status: {transaction_response["transactionStatus"]}')
batch_size = 1800
current_batch_size = 0
transaction_count = 0
sql = 'INSERT INTO T_RIG_ACTIVITY_STATUS_DATE VALUES (\
:RIG_ID, :DEPTH_CAPACITY, # MANY MORE ENTRIES HERE);'
parameter_set = []
with open('LARGE_FILE.csv', 'r') as file:
reader = csv.DictReader(file, delimiter=',')
for row in reader:
entry = get_entry(row)
if(current_batch_size == batch_size):
execute_transaction(sql, parameter_set)
transaction_count = transaction_count + 1
print(f'Transaction count: {transaction_count}')
current_batch_size = 0
parameter_set.clear()
else:
parameter_set.append(entry)
current_batch_size = current_batch_size + 1
据我了解,413
错误代码意味着“请求实体太大”,这表明我需要降低发送到数据库的批量大小(通过 DATA API),但我不明白为什么我能够发送几个在我开始收到上述错误之前批量 1800 条 SQL 语句。有什么建议吗?
另外,考虑到我需要推送/插入数据库的数据,在我的情况下,最好的解决方案是什么?