1

我编写了一个 pythonc 脚本,它使用boto3python 库来查询 Aurora Serverless (PostgreSQL) 数据库。我正在使用 DATA API 批量插入(我在多个批次中这样做)一个非常大的CSV文件,其中包含超过 600 万条记录到数据库中。每条记录包含 37 列。当我在我的 PC 上运行脚本时(我已经设置了 AWS-CLI 凭证,以便我使用授权的用户与云中的 Aurora DB 对话)我能够成功插入几批包含 1800 个 SQL 插入语句的批次,然后我得到那个错误:

An error occurred (413) when calling the BatchExecuteStatement operation

Python脚本:

import boto3
import csv
rds_client = boto3.client('rds-data')

database_name = "postgres"
db_cluster_arn = "arn:aws:rds:us-east-1:xxxxxxxx:cluster:database-1"
db_credentials_secrets_store_arn = "arn:aws:secretsmanager:us-east-1:xxxxxxxxxx:secret:rds-db-credentials/cluster-VPF7JUKVRLQMEHF4QV2HSIKELM/postgres-QVIzWC"

def batch_execute_statement(sql, sql_parameter_sets, transaction_id=None):
    parameters = {
        'secretArn': db_credentials_secrets_store_arn,
        'database': database_name,
        'resourceArn': db_cluster_arn,
        'sql': sql,
        'parameterSets': sql_parameter_sets
    }
    if transaction_id is not None:
        parameters['transactionId'] = transaction_id
    response = rds_client.batch_execute_statement(**parameters)
    return response


def get_entry(row):
    entry = [
        {'name': 'RIG_ID', 'value': {'stringValue': row['RIG_ID']}},
        
    # DEPTH_CAPACITY
    if row['DEPTH_CAPACITY'] == '':
        entry.append({'name': 'DEPTH_CAPACITY',
                      'value': {'isNull': True}})
    else:
        entry.append({'name': 'DEPTH_CAPACITY', 'typeHint': 'DECIMAL', 'value': {
            'stringValue': row['DEPTH_CAPACITY']}})
    # MANY MORE ENTRIES HERE
    return entry

def execute_transaction(sql, parameter_set):
    transaction = rds_client.begin_transaction(secretArn=db_credentials_secrets_store_arn,resourceArn=db_cluster_arn,database=database_name)
    try:
        response = batch_execute_statement(sql, parameter_set, transaction['transactionId'])
    except Exception as e:
        transaction_response = rds_client.rollback_transaction(
        secretArn=db_credentials_secrets_store_arn,
        resourceArn=db_cluster_arn,
        transactionId=transaction['transactionId'])
    else:
        transaction_response = rds_client.commit_transaction(
        secretArn=db_credentials_secrets_store_arn,
        resourceArn=db_cluster_arn,
        transactionId=transaction['transactionId'])
        print(f'Number of records updated: {len(response["updateResults"])}')
    print(f'Transaction Status: {transaction_response["transactionStatus"]}')

batch_size = 1800
current_batch_size = 0
transaction_count = 0

sql = 'INSERT INTO T_RIG_ACTIVITY_STATUS_DATE VALUES (\
:RIG_ID, :DEPTH_CAPACITY, # MANY MORE ENTRIES HERE);'
    
parameter_set = []

with open('LARGE_FILE.csv', 'r') as file:
    reader = csv.DictReader(file, delimiter=',')
    
    for row in reader:

        entry = get_entry(row)

        if(current_batch_size == batch_size):
            execute_transaction(sql, parameter_set)

            transaction_count = transaction_count + 1
            print(f'Transaction count: {transaction_count}')

            current_batch_size = 0
            parameter_set.clear()
        else:
            parameter_set.append(entry)
            current_batch_size = current_batch_size + 1

据我了解,413错误代码意味着“请求实体太大”,这表明我需要降低发送到数据库的批量大小(通过 DATA API),但我不明白为什么我能够发送几个在我开始收到上述错误之前批量 1800 条 SQL 语句。有什么建议吗?

另外,考虑到我需要推送/插入数据库的数据,在我的情况下,最好的解决方案是什么?

4

1 回答 1

0

降低批量大小——为我解决了这个问题。就我而言,我的 csv 中的一些记录包含更多导致错误的数据。

于 2020-12-03T08:14:26.243 回答