2

我是这个 Databricks 自动加载器的新手,我们有一个要求,我们需要通过 Databricks 自动加载器处理从 AWS s3 到增量表的数据。我正在测试这个自动加载器,所以我遇到了重复的问题,如果我上传一个名为 emp_09282021.csv 的文件,它具有与 emp_09272021.csv 相同的数据,那么它没有检测到任何重复,它只是插入它们,所以如果我有 5 行emp_09272021.csv 文件现在在我上传 emp_09282021.csv 文件时将变为 10 行。

下面是我尝试过的代码:

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header",True) \
  .schema("id string,name string, age string,city string") \
  .load("s3://some-s3-path/source/") \
  .writeStream.format("delta") \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "s3://some-s3-path/tgt_checkpoint_0928/") \
  .start("s3://some-s3-path/spark_stream_processing/target/")

请有任何指导来处理这个问题?

4

1 回答 1

2

检测重复不是自动加载器的任务,它为您提供了摄取数据的可能性,但您需要自己处理重复。有几种方法:

  • 使用内置dropDuplicates函数。建议将其与水印一起使用以避免创建巨大的状态,但您需要有一些列将用作事件时间,并且它应该是dropDuplicate列表的一部分(有关更多详细信息,请参阅文档):
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates("col1", "eventTime")
  • 使用Delta 的合并功能- 您只需要插入不在 Delta 表中的数据,但您需要为此使用 foreachBatch。像这样的东西(请注意该表应该已经存在,或者您需要添加一个不存在的表的处理):
from delta.tables import *

def drop_duplicates(df, epoch):
  table = DeltaTable.forPath(spark, 
      "s3://some-s3-path/spark_stream_processing/target/")
   dname = "destination"
   uname = "updates"
   dup_columns = ["col1", "col2"]
   merge_condition = " AND ".join([f"{dname}.{col} = {uname}.{col}"
      for col in dup_columns])
   table.alias(dname).merge(df.alias(uname), merge_condition)\
     .whenNotMatchedInsertAll().execute()

# ....
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header",True) \
  .schema("id string,name string, age string,city string") \
  .load("s3://some-s3-path/source/") \
  .writeStream.foreachBatch(drop_duplicates)\
  .option("checkpointLocation", "s3://some-s3-path/tgt_checkpoint_0928/") \
  .start()

在此代码中,您需要更改dup_columns变量以指定用于检测重复项的列。

于 2021-09-28T10:51:42.513 回答