我正在尝试使用 AWS 胶水上的 python 脚本同时处理插入/更新到 redshift 数据库。我正在使用 pg8000 库来执行我所有的数据库操作。并发插入/更新失败并出现错误Error Name:1023 ,Error State:XX000)
。在研究错误时,我发现该错误与Serializable Isolation有关。
任何人都可以查看代码并确保在插入/更新发生时不会发生冲突吗?
我尝试在调用类中使用随机睡眠时间。它适用于几个案例,但随着工人数量的增加。插入/更新案例失败。
import sys
import time
import concurrent.futures
import pg8000
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME','REDSHIFT_HOST','REDSHIFT_PORT','REDSHIFT_DB','REDSHIFT_USER_NAME','REDSHIFT_USER_PASSWORD'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
job_run_id = args['JOB_RUN_ID']
maximum_workers = 5
def executeSql(sqlStmt):
conn = pg8000.connect(database=args['REDSHIFT_DB'],user=args['REDSHIFT_USER_NAME'],password=args['REDSHIFT_USER_PASSWORD'],host=args['REDSHIFT_HOST'],port=int(args['REDSHIFT_PORT']))
conn.autocommit = True
cur = conn.cursor()
cur.execute(sqlStmt)
cur.close()
conn.close()
def executeSqlProcedure(procedureName, procedureArgs = ""):
try:
logProcStrFormat = "CALL table_insert_proc('{}','{}','{}','{}',{},{})"
#Insert into the log table - create the record
executeSql (logProcStrFormat.format(job_run_id,procedureName,'pending','','getdate()','null')) #Code fails here
#Executing the procedure
procStrFormat = "CALL {}({})"
executeSql(procStrFormat.format(procedureName,procedureArgs))
print("Printing from {} process at ".format(procedureName),time.ctime())
#Update the record in log table to complete
executeSql (logProcStrFormat.format(job_run_id,procedureName,'complete','','null','getdate()')) #Code fails here
except Exception as e:
errorMsg = str(e.message["M"])
executeSql (logProcStrFormat.format(job_run_id,procedureName,'failure',errorMsg,'null','getdate()'))
raise
sys.exit(1)
def runDims():
dimProcedures = ["test_proc1","test_proc2","test_proc3","test_proc4","test_proc5"]
with concurrent.futures.ThreadPoolExecutor(max_workers=maximum_workers) as executor:
result = list(executor.map(executeSqlProcedure, dimProcedures))
def runFacts():
factProcedures = ["test_proc6","test_proc7","test_proc8","test_proc9"]
with concurrent.futures.ThreadPoolExecutor(max_workers=maximum_workers) as executor:
result = list(executor.map(executeSqlProcedure, factProcedures))
runDims()
runFacts()
我希望插入/更新发生在日志表中而不会锁定/出错