0

我正在尝试使用 AWS 胶水上的 python 脚本同时处理插入/更新到 redshift 数据库。我正在使用 pg8000 库来执行我所有的数据库操作。并发插入/更新失败并出现错误Error Name:1023 ,Error State:XX000)。在研究错误时,我发现该错误与Serializable Isolation有关。

任何人都可以查看代码并确保在插入/更新发生时不会发生冲突吗?

我尝试在调用类中使用随机睡眠时间。它适用于几个案例,但随着工人数量的增加。插入/更新案例失败。

    import sys
    import time
    import concurrent.futures
    import pg8000
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job

    args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME','REDSHIFT_HOST','REDSHIFT_PORT','REDSHIFT_DB','REDSHIFT_USER_NAME','REDSHIFT_USER_PASSWORD'])

    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    job_run_id = args['JOB_RUN_ID']
    maximum_workers = 5

    def executeSql(sqlStmt):
      conn = pg8000.connect(database=args['REDSHIFT_DB'],user=args['REDSHIFT_USER_NAME'],password=args['REDSHIFT_USER_PASSWORD'],host=args['REDSHIFT_HOST'],port=int(args['REDSHIFT_PORT']))
      conn.autocommit = True
      cur = conn.cursor()
      cur.execute(sqlStmt)
      cur.close()
      conn.close()


    def executeSqlProcedure(procedureName, procedureArgs = ""):
        try:
            logProcStrFormat  = "CALL table_insert_proc('{}','{}','{}','{}',{},{})"
            #Insert into the log table - create the record
            executeSql (logProcStrFormat.format(job_run_id,procedureName,'pending','','getdate()','null')) #Code fails here
            #Executing the procedure
            procStrFormat = "CALL {}({})"
            executeSql(procStrFormat.format(procedureName,procedureArgs))
            print("Printing from {} process at ".format(procedureName),time.ctime())
            #Update the record in log table to complete
            executeSql (logProcStrFormat.format(job_run_id,procedureName,'complete','','null','getdate()')) #Code fails here
        except Exception as e:
            errorMsg = str(e.message["M"])
            executeSql (logProcStrFormat.format(job_run_id,procedureName,'failure',errorMsg,'null','getdate()'))
            raise 
            sys.exit(1)


    def runDims():
      dimProcedures = ["test_proc1","test_proc2","test_proc3","test_proc4","test_proc5"]

      with concurrent.futures.ThreadPoolExecutor(max_workers=maximum_workers) as executor:
        result = list(executor.map(executeSqlProcedure, dimProcedures))


    def runFacts():
      factProcedures = ["test_proc6","test_proc7","test_proc8","test_proc9"]

      with concurrent.futures.ThreadPoolExecutor(max_workers=maximum_workers) as executor:
        result = list(executor.map(executeSqlProcedure, factProcedures))    


    runDims()
    runFacts()

我希望插入/更新发生在日志表中而不会锁定/出错

4

1 回答 1

1

Amazon Redshift 不适用于大量小INSERT语句。

使用多行插入 - Amazon Redshift

如果 COPY 命令不是一个选项并且您需要 SQL 插入,请尽可能使用多行插入。当您一次只添加一行或几行数据时,数据压缩效率低下。

多行插入通过批处理一系列插入来提高性能。下面的示例使用单个 INSERT 语句将三行插入到一个四列表中。这仍然是一个小插入,只是为了说明多行插入的语法而显示。

insert into category_stage values
(default, default, default, default),
(20, default, 'Country', default),
(21, 'Concerts', 'Rock', default);

或者,将数据输出到 Amazon S3,然后使用COPY命令执行批量加载。这将更有效率,因为它可以在所有节点上并行执行加载。

于 2019-07-19T02:45:39.800 回答