28

以下是关于我如何设置的一些要点:

  • 我已将 CSV 文件上传到 S3,并设置了 Glue 爬虫来创建表和架构。
  • 我有一个 Glue 作业设置,它使用 JDBC 连接将 Glue 表中的数据写入我们的 Amazon Redshift 数据库。Job 还负责映射列和创建红移表。

通过重新运行作业,我在 redshift 中得到了重复的行(如预期的那样)。但是,有没有办法在插入新数据之前替换或删除行,使用键或胶水设置的分区?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields

from pyspark.sql.functions import lit

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

columnMapping = [
    ("id", "int", "id", "int"),
    ("name", "string", "name", "string"),
]

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")

## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")

job.commit()
4

6 回答 6

19

工作书签是关键。只需编辑作业并启用“作业书签”,它就不会处理已处理的数据。请注意,作业必须重新运行一次才能检测到它不必再次重新处理旧数据。

有关更多信息,请参阅: http ://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

在我看来,“书签”这个名字有点牵强。如果我在搜索过程中没有偶然发现它,我永远不会看它。

于 2017-09-20T08:49:51.750 回答
7

这是我从 AWS Glue Support 获得的解决方案:

您可能知道,虽然您可以创建主键,但 Redshift 并不强制执行唯一性。因此,如果您正在重新运行 Glue 作业,则可能会插入重复的行。保持唯一性的一些方法是:

  1. 使用临时表插入所有行,然后在主表中执行 upsert/merge [1],这必须在胶水之外完成。

  2. 在您的 redshift 表 [1] 中添加另一列,例如插入时间戳,以允许重复但要知道哪一列先出现或最后一列,然后在需要时删除重复项。

  3. 将之前插入的数据加载到dataframe中,然后比较要插入的数据,避免插入重复[3]

[1] - http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.htmlhttp://www.silota.com/blog/amazon-redshift-upsert-support-临时表替换行/

[2] - https://github.com/databricks/spark-redshift/issues/238

[3] - https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html

于 2017-09-20T15:23:40.780 回答
4

请检查这个答案。有解释和代码示例如何使用临时表将数据插入 Redshift。preactions在 Glue 使用和postactions选项写入数据之前或之后,可以使用相同的方法运行任何 SQL 查询:

// Write data to staging table in Redshift
glueContext.getJDBCSink(
  catalogConnection = "redshift-glue-connections-test",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> staging,
    "overwrite" -> "true",
    "preactions" -> "<another SQL queries>",
    "postactions" -> "<some SQL queries>"
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)
于 2019-01-11T19:43:20.900 回答
3

今天,我已经测试并获得了使用 JDBC 连接从目标表中更新/删除的解决方法。

我已经使用如下

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

import pg8000
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'PW',
    'HOST',
    'USER',
    'DB'
])
# ...
# Create Spark & Glue context

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ...
config_port = ****
conn = pg8000.connect(
    database=args['DB'], 
    user=args['USER'], 
    password=args['PW'],
    host=args['HOST'],
    port=config_port
)
query = "UPDATE table .....;"

cur = conn.cursor()
cur.execute(query)
conn.commit()
cur.close()



query1 = "DELETE  AAA FROM  AAA A, BBB B WHERE  A.id = B.id"

cur1 = conn.cursor()
cur1.execute(query1)
conn.commit()
cur1.close()
conn.close()
于 2018-07-04T13:27:56.940 回答
0

正如上面所建议的,Glue 中的作业书签选项应该可以解决问题。当我的源是 S3 时,我一直在成功使用它。 http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

于 2017-12-03T05:02:27.803 回答
0

根据我的测试(使用相同的场景),BOOKMARK 功能不起作用。多次运行作业时会插入重复数据。通过每天(通过 lambda)从 S3 位置删除文件并实施暂存和目标表,我已经解决了这个问题。数据将根据匹配的键列进行插入/更新。

于 2018-07-03T12:53:43.453 回答