2

我坚持从 dedupe.io 设置 python 和库 dedupe 以对 postgres 数据库中的一组条目进行重复数据删除。错误是 - “记录不符合数据模型”这应该很容易解决,但我只是不明白为什么会收到此消息。

我现在拥有的(重点代码并删除了其他功能)

# ## Setup
settings_file = 'lead_dedupe_settings'
training_file = 'lead_dedupe_training.json'

start_time = time.time()

...

def training():
    # We'll be using variations on this following select statement to pull
    # in campaign donor info.
    #
    # We did a fair amount of preprocessing of the fields in

    """ Define Lead Query """
    sql = "select id, phone, mobilephone, postalcode, email from dev_manuel.somedata"

    # ## Training

    if os.path.exists(settings_file):
        print('reading from ', settings_file)
        with open(settings_file, 'rb') as sf:
            deduper = dedupe.StaticDedupe(sf, num_cores=4)
    else:

        # Define the fields dedupe will pay attention to
        #
        # The address, city, and zip fields are often missing, so we'll
        # tell dedupe that, and we'll learn a model that take that into
        # account
        fields = [
                {'field': 'id', 'type': 'ShortString'},
                {'field': 'phone', 'type': 'String', 'has missing': True},
                {'field': 'mobilephone', 'type': 'String', 'has missing': True},
                {'field': 'postalcode', 'type': 'ShortString', 'has missing': True},
                {'field': 'email', 'type': 'String', 'has missing': True}
                ]

        # Create a new deduper object and pass our data model to it.
        deduper = dedupe.Dedupe(fields, num_cores=4)


        # connect to db and execute
        conn = None
        try:
            # read the connection parameters
            params = config()
            # connect to the PostgreSQL server
            conn = psycopg2.connect(**params)
            print('Connecting to the PostgreSQL database...')

            cur = conn.cursor()
            # excute sql
            cur.execute(sql)

            temp_d = dict((i, row) for i, row in enumerate(cur))

            print(temp_d)

            deduper.sample(temp_d, 10000)

            print('Done stage 1')

            del temp_d

            # close communication with the PostgreSQL database server
            cur.close()

        except (Exception, psycopg2.DatabaseError) as error:
            print(error)
        finally:
            if conn is not None:
                conn.close()
                print('Closed Connection')

        # If we have training data saved from a previous run of dedupe,
        # look for it an load it in.
        #
        # __Note:__ if you want to train from
        # scratch, delete the training_file
        if os.path.exists(training_file):
            print('reading labeled examples from ', training_file)
            with open(training_file) as tf:
                deduper.readTraining(tf)

        # ## Active learning

        print('starting active labeling...')
        # Starts the training loop. Dedupe will find the next pair of records
        # it is least certain about and ask you to label them as duplicates
        # or not.

        # debug
        print(deduper)
        # vars(deduper)

        # use 'y', 'n' and 'u' keys to flag duplicates
        # press 'f' when you are finished
        dedupe.convenience.consoleLabel(deduper)
        # When finished, save our labeled, training pairs to disk
        with open(training_file, 'w') as tf:
            deduper.writeTraining(tf)

        # Notice our argument here
        #
        # `recall` is the proportion of true dupes pairs that the learned
        # rules must cover. You may want to reduce this if your are making
        # too many blocks and too many comparisons.
        deduper.train(recall=0.90)

        with open(settings_file, 'wb') as sf:
            deduper.writeSettings(sf)

        # We can now remove some of the memory hobbing objects we used
        # for training
        deduper.cleanupTraining()

错误消息是“记录不符合数据模型。字段 'id' 在 data_model 但不在记录中”。如您所见,我正在定义要“学习”的 5 个字段。我正在使用的查询准确地返回了这 5 列以及其中的数据。的输出

print(temp_d)

{0: ('00Q1o00000OjmQmEAJ', '+4955555555', None, '01561', None), 1: ('00Q1o00000JhgSUEAZ', None, '+4915555555', '27729', 'email@aemail.de')}

在我看来,这就像重复数据删除库的有效输入。

我试过的

  • 我检查了他是否已经编写了一个文件作为训练集,该文件会以某种方式被读取和使用,事实并非如此(代码甚至会这样说)
  • 我尝试调试字段定义等进入的“deduper”对象,我可以看到字段定义
  • 查看其他示例,例如 csv 或 mysql,它们的功能与我几乎相同。

请指出我错的方向。

4

1 回答 1

2

看起来问题可能是您的 temp_d 是元组字典,而不是字典字典的预期输入。我刚开始使用这个包,并在此处找到了一个适用于我的目的的示例,它提供了用于设置字典的功能,尽管它来自 csv 而不是你的数据提取。

data_d = {}
with open(filename) as f:
    reader = csv.DictReader(f)
    for row in reader:
        clean_row = [(k, preProcess(v)) for (k, v) in row.items()]
        row_id = int(row['Id'])
        data_d[row_id] = dict(clean_row)

return data_d
于 2019-04-25T16:04:59.050 回答