0

我有一个.ndjson文件如下所示:

{"property_id": "107", "transaction_unique_id": "{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}", "price": 80000, "date_of_transfer": "2021-05-07 00:00", "postcode": "BL2 2GY", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "14", "SAON": "", "street": "RIVER VIEW COURT", "locality": "", "town_city": "BOLTON", "district": "BOLTON", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "108", "transaction_unique_id": "{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}", "price": 330000, "date_of_transfer": "2021-02-26 00:00", "postcode": "SK6 4AN", "property_type": "S", "old_new": "N", "duration": "F", "PAON": "18", "SAON": "", "street": "GUYWOOD LANE", "locality": "ROMILEY", "town_city": "STOCKPORT", "district": "STOCKPORT", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}", "price": 215000, "date_of_transfer": "2021-02-19 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 022", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}", "price": 226500, "date_of_transfer": "2021-02-08 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 727", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}", "price": 262000, "date_of_transfer": "2021-05-14 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 025", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
...

我使用 Apache Beam 读取它并将数据按 分组property_id,然后将输出写入 json 文件,但数据如下所示:

('107', [PPD(property_id='107', transaction_unique_id='{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}', price=80000, date_of_transfer='2021-05-07 00:00', postcode='BL2 2GY', property_type='F', old_new='N', duration='L', PAON='14', SAON='', street='RIVER VIEW COURT', locality='', town_city='BOLTON', district='BOLTON', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
('108', [PPD(property_id='108', transaction_unique_id='{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}', price=330000, date_of_transfer='2021-02-26 00:00', postcode='SK6 4AN', property_type='S', old_new='N', duration='F', PAON='18', SAON='', street='GUYWOOD LANE', locality='ROMILEY', town_city='STOCKPORT', district='STOCKPORT', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
('109', [PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}', price=215000, date_of_transfer='2021-02-19 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 022', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A'), PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}', price=226500, date_of_transfer='2021-02-08 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 727', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A'), PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}', price=262000, date_of_transfer='2021-05-14 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 025', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
...

我们可以看到 for property_id = '109',它分组了三个记录,但是上面的输出格式真的很奇怪......有没有人知道为什么会这样以及如何将其转换为换行符分隔的 JSON 格式,然后写入 JSON 文件?

预期的格式看起来像(不确定这是否是有效的换行符分隔的 json 格式,但想法是将相同的事务property_id(例如109)包含在数组中):

{"property_id": "107", "transaction_unique_id": "{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}", "price": 80000, "date_of_transfer": "2021-05-07 00:00", "postcode": "BL2 2GY", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "14", "SAON": "", "street": "RIVER VIEW COURT", "locality": "", "town_city": "BOLTON", "district": "BOLTON", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "108", "transaction_unique_id": "{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}", "price": 330000, "date_of_transfer": "2021-02-26 00:00", "postcode": "SK6 4AN", "property_type": "S", "old_new": "N", "duration": "F", "PAON": "18", "SAON": "", "street": "GUYWOOD LANE", "locality": "ROMILEY", "town_city": "STOCKPORT", "district": "STOCKPORT", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transactions": [{"transaction_unique_id": "{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}", "price": 215000, "date_of_transfer": "2021-02-19 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 022", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"},{"transaction_unique_id": "{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}", "price": 226500, "date_of_transfer": "2021-02-08 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 727", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"},{"transaction_unique_id": "{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}", "price": 262000, "date_of_transfer": "2021-05-14 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 025", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}]}
...

有人可以帮忙吗,我很新,任何帮助将不胜感激。谢谢。

4

1 回答 1

1

我假设这PPD是一个命名元组,您正在获取PPD对象的 PCollection 并将它们分组为

grouped = (
    ppd_pcoll
    | beam.Map(lambda ppd: (ppd.property_id, property_id)
    | beam.GroupByKey())

现在grouped是 2 元组的 PCollection,其中第一个是属性 id 字符串,第二个是 PPD 的可迭代(具有该属性 id)。

为了得到你想要的,你需要将它映射到所需的字典以将其输出为 json,例如

to_write_to_json = (grouped
 | beam.MapTuple(lambda property_id, ppds: {
      'property_id': property_id,
      'transactions': [ppd_to_transaction(ppd) for ppd in ppds],
     })

whereppd_to_transaction是一个函数,它接受您的 PPD 对象并返回具有所需事务属性的 dict。

于 2021-08-16T22:36:38.163 回答