0

我是 python 的新手,并试图使用 'confluent_kafka' 来生成 avro 消息。使用 'confluent_kafka.schema_registry.avro.AvroSerializer' 相同(参考:https ://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py )

它适用于带有 dict(json 转换为 dict) 输入的简单 avro 模式,但对于以下示例模式,我收到错误:

架构:

{
    "type": "record",
    "name": "Envelope",
    "namespace": "CoreOLTPEvents.dbo.Event",
    "fields": [{
        "name": "before",
        "type": ["null", {
            "type": "record",
            "name": "Value",
            "fields": [{
                "name": "EventId",
                "type": "long"
            }, {
                "name": "CameraId",
                "type": ["null", "long"],
                "default": null
            }],
            "connect.name": "CoreOLTPEvents.dbo.Event.Value"
        }],
        "default": null
    }, {
        "name": "after",
        "type": ["null", "Value"],
        "default": null
    }, {
        "name": "source",
        "type": {
            "type": "record",
            "name": "Source",
            "namespace": "io.debezium.connector.sqlserver",
            "fields": [{
                "name": "version",
                "type": "string"
            }, {
                "name": "connector",
                "type": "string"
            }],
            "connect.name": "io.debezium.connector.sqlserver.Source"
        }
    }, {
        "name": "op",
        "type": "string"
    }],
    "connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}

输入 JSON :

{
    "after": null,
    "before": {
        "CoreOLTPEvents.dbo.Event.Value" : {
        "EventId": 1111111111,
        "CameraId": 222222222
    }
    },
    "source": {
        "version": "InitialLoad",
        "connector": "sqlserver"
    },
    "op": "C"
}

错误 : ValueError: {'CoreOLTPEvents.dbo.Event.Value': {'EventId': 1111111111, 'CameraId': 222222222}} (type <class 'dict'>) do not match ['null', {'connect.name': 'CoreOLTPEvents.dbo.Event.Value', 'type': 'record', 'name': 'CoreOLTPEvents.dbo.Event.Value', 'fields': [{'name': 'EventId', 'type': 'long'}, {'default': None, 'name': 'CameraId', 'type': ['null', 'long']}]}] on field before

'before' 字段类型是联合(['null',record]),如果我将其更改为仅记录(删除联合),那么它可以正常工作。但是我需要调整我的输入,使其适用于给定的模式。

(注意:我正在使用 'json.load(json_file)' 读取 json 输入,因此它提供了 dict 输出)

任何帮助将非常感激。

更新:实际大架构:

{
    "type": "record",
    "name": "Envelope",
    "namespace": "CoreOLTPEvents.dbo.Event",
    "fields": [{
        "name": "before",
        "type": ["null", {
            "type": "record",
            "name": "Value",
            "fields": [{
                "name": "EventId",
                "type": "long"
            }, {
                "name": "CameraId",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "SiteId",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "VehicleId",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "EventReviewStatusID",
                "type": "int"
            }, {
                "name": "EventTypeId",
                "type": ["null", "int"],
                "default": null
            }, {
                "name": "EventDateTime",
                "type": ["null", {
                    "type": "string",
                    "connect.name": "net.smartdrive.converters.SmartdriveEventDateFieldConverter"
                }],
                "default": null
            }, {
                "name": "FTPUploadDateTime",
                "type": {
                    "type": "long",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.Timestamp"
                }
            }, {
                "name": "CAMFileName",
                "type": "string"
            }, {
                "name": "KeypadEntryCode",
                "type": ["null", "string"],
                "default": null
            }, {
                "name": "IsActive",
                "type": {
                    "type": "boolean",
                    "connect.default": true
                },
                "default": true
            }, {
                "name": "Flagged",
                "type": "boolean"
            }, {
                "name": "EventTitle",
                "type": ["null", "string"],
                "default": null
            }, {
                "name": "CreatedBy",
                "type": "long"
            }, {
                "name": "CreatedDate",
                "type": {
                    "type": "long",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.Timestamp"
                }
            }, {
                "name": "ModifiedBy",
                "type": "long"
            }, {
                "name": "ModifiedDate",
                "type": {
                    "type": "long",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.Timestamp"
                }
            }, {
                "name": "ReReviewAnalysis",
                "type": ["null", "string"],
                "default": null
            }, {
                "name": "LegacyEventId",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "TripId",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "FileVersion",
                "type": ["null", "string"],
                "default": null
            }, {
                "name": "EventNumber",
                "type": ["null", "string"],
                "default": null
            }, {
                "name": "Latitude",
                "type": ["null", {
                    "type": "bytes",
                    "scale": 10,
                    "precision": 13,
                    "connect.version": 1,
                    "connect.parameters": {
                        "scale": "10",
                        "connect.decimal.precision": "13"
                    },
                    "connect.name": "org.apache.kafka.connect.data.Decimal",
                    "logicalType": "decimal"
                }],
                "default": null
            }, {
                "name": "Longitude",
                "type": ["null", {
                    "type": "bytes",
                    "scale": 10,
                    "precision": 13,
                    "connect.version": 1,
                    "connect.parameters": {
                        "scale": "10",
                        "connect.decimal.precision": "13"
                    },
                    "connect.name": "org.apache.kafka.connect.data.Decimal",
                    "logicalType": "decimal"
                }],
                "default": null
            }, {
                "name": "GeoAddressId",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "ReviewedEventId",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "VideoStatus",
                "type": {
                    "type": "int",
                    "connect.default": 0
                },
                "default": 0
            }, {
                "name": "PredictionImportance",
                "type": ["null", {
                    "type": "bytes",
                    "scale": 10,
                    "precision": 15,
                    "connect.version": 1,
                    "connect.parameters": {
                        "scale": "10",
                        "connect.decimal.precision": "15"
                    },
                    "connect.name": "org.apache.kafka.connect.data.Decimal",
                    "logicalType": "decimal"
                }],
                "default": null
            }, {
                "name": "FlaggedBy",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "FlaggedDate",
                "type": ["null", {
                    "type": "long",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.Timestamp"
                }],
                "default": null
            }, {
                "name": "TriggerTypeId",
                "type": ["null", "int"],
                "default": null
            }, {
                "name": "VideoDeleteDate",
                "type": ["null", {
                    "type": "long",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.Timestamp"
                }],
                "default": null
            }, {
                "name": "MetadataDeleteDate",
                "type": ["null", {
                    "type": "long",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.Timestamp"
                }],
                "default": null
            }, {
                "name": "RetentionStatus",
                "type": {
                    "type": "int",
                    "connect.default": 0,
                    "connect.type": "int16"
                },
                "default": 0
            }, {
                "name": "PartnerTriggerId",
                "type": ["null", "int"],
                "default": null
            }, {
                "name": "CoachingStateId",
                "type": {
                    "type": "int",
                    "connect.default": 0,
                    "connect.type": "int16"
                },
                "default": 0
            }, {
                "name": "EventKudoHistoryId",
                "type": ["null", "int"],
                "default": null
            }],
            "connect.name": "CoreOLTPEvents.dbo.Event.Value"
        }],
        "default": null
    }, {
        "name": "after",
        "type": ["null", "Value"],
        "default": null
    }, {
        "name": "source",
        "type": {
            "type": "record",
            "name": "Source",
            "namespace": "io.debezium.connector.sqlserver",
            "fields": [{
                "name": "version",
                "type": "string"
            }, {
                "name": "connector",
                "type": "string"
            }, {
                "name": "name",
                "type": "string"
            }, {
                "name": "ts_ms",
                "type": "long"
            }, {
                "name": "snapshot",
                "type": [{
                    "type": "string",
                    "connect.version": 1,
                    "connect.parameters": {
                        "allowed": "true,last,false"
                    },
                    "connect.default": "false",
                    "connect.name": "io.debezium.data.Enum"
                }, "null"],
                "default": "false"
            }, {
                "name": "db",
                "type": "string"
            }, {
                "name": "schema",
                "type": "string"
            }, {
                "name": "table",
                "type": "string"
            }, {
                "name": "change_lsn",
                "type": ["null", "string"],
                "default": null
            }, {
                "name": "commit_lsn",
                "type": ["null", "string"],
                "default": null
            }, {
                "name": "event_serial_no",
                "type": ["null", "long"],
                "default": null
            }],
            "connect.name": "io.debezium.connector.sqlserver.Source"
        }
    }, {
        "name": "op",
        "type": "string"
    }, {
        "name": "ts_ms",
        "type": ["null", "long"],
        "default": null
    }, {
        "name": "transaction",
        "type": ["null", {
            "type": "record",
            "name": "ConnectDefault",
            "namespace": "io.confluent.connect.avro",
            "fields": [{
                "name": "id",
                "type": "string"
            }, {
                "name": "total_order",
                "type": "long"
            }, {
                "name": "data_collection_order",
                "type": "long"
            }]
        }],
        "default": null
    }],
    "connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}

大型模式的输入:

{
    "before": null,
    "after": {
        "EventId": 1234566,
        "CameraId": 2233,
        "SiteId": 111,
        "VehicleId": 45587,
        "EventReviewStatusID": 10,
        "EventTypeId": 123,
        "EventDateTime": "2015-01-02T01:30:29Z",
        "FTPUploadDateTime": 1420193330590,
        "CAMFileName": "XYZ",
        "KeypadEntryCode": "0",
        "IsActive": false,
        "Flagged": false,
        "EventTitle": null,
        "CreatedBy": 1,
        "CreatedDate": 1420191120730,
        "ModifiedBy": 1,
        "ModifiedDate": 1577871185680,
        "ReReviewAnalysis": null,
        "LegacyEventId": null,
        "TripId": 3382,
        "FileVersion": "2.2",
        "EventNumber": "AAAA-BBBB",
        "Latitude": "UU9elrA=",
        "Longitude": "/ueZUeFw",
        "GeoAddressId": null,
        "ReviewedEventId": 129411077,
        "VideoStatus": 4,
        "PredictionImportance": 0.1402457539,
        "FlaggedBy": null,
        "FlaggedDate": null,
        "TriggerTypeId": 322,
        "VideoDeleteDate": 1422783120000,
        "MetadataDeleteDate": 1577871120000,
        "RetentionStatus": 15,
        "PartnerTriggerId": null,
        "CoachingStateId": 0,
        "EventKudoHistoryId": null

    },
    "source": {
        "version": "Final",
        "connector": "sqlserver",
        "name": "CoreOLTP",
        "ts_ms": 1615813992548,
        "snapshot": "false",
        "db": "CoreOLTP",
        "schema": "dbo",
        "table": "xyz",
        "change_lsn": null,
        "commit_lsn": null,
        "event_serial_no": null
    },
    "op": "C",
    "ts_ms": 1615813992548,
    "transaction": null
}

错误 :

confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="{'EventId': 129411077, 'CameraId': 46237, 'SiteId': 2148, 'VehicleId': 45587, 'EventReviewStatusID': 10, 'EventTypeId': 247, 'EventDateTime': '2015-01-02T01:30:29Z', 'FTPUploadDateTime': 1420191120590, 'CAMFileName': 'JD2BC02120150102013029ER.SDE', 'KeypadEntryCode': '0', 'IsActive': False, 'Flagged': False, 'EventTitle': None, 'CreatedBy': 1, 'CreatedDate': 1420191120730, 'ModifiedBy': 1, 'ModifiedDate': 1577871185680, 'ReReviewAnalysis': None, 'LegacyEventId': None, 'TripId': 3382, 'FileVersion': '2.2', 'EventNumber': 'WSHX-8QQ2', 'Latitude': 'UU9elrA=', 'Longitude': '/ueZUeFw', 'GeoAddressId': None, 'ReviewedEventId': 129411077, 'VideoStatus': 4, 'PredictionImportance': 0.1402457539, 'FlaggedBy': None, 'FlaggedDate': None, 'TriggerTypeId': 322, 'VideoDeleteDate': 1422783120000, 'MetadataDeleteDate': 1577871120000, 'RetentionStatus': 15, 'PartnerTriggerId': None, 'CoachingStateId': 0, 'EventKudoHistoryId': None} (type <class 'dict'>) do not match ['null', 'CoreOLTPEvents.dbo.Event.Value'] on field after"}
4

1 回答 1

1

您只需要更改您的输入,以便该before字段没有命名空间。所以它需要看起来像这样:

{
    "after": null,
    "before": {
        "EventId": 1111111111,
        "CameraId": 222222222
    },
    "source": {
        "version": "InitialLoad",
        "connector": "sqlserver"
    },
    "op": "C"
}

您的原始输入看起来像是在尝试使用 JSON 编码的 avro,因为该字段before具有CoreOLTPEvents.dbo.Event.Value命名空间。但是,我猜它一定是手工制作的,因为CameraId应该指定为{"long": 222222222}而不仅仅是222222222.

如果您确实有 Avro 编码的 JSON(来自其他进程或其他东西的结果),那么您可以使用类似fastavro.json_reader读取该文件的东西,它将创建正确的内存表示(不包括联合的类型信息字段)。

更新:

为了弄清楚完整架构和完整数据的问题是什么,我首先使用加载了两个对象json.load,然后使用fastavro.validate(record, schema)了它的输出是一个堆栈跟踪,它以这个结尾:

fastavro._validate_common.ValidationError: [
  "CoreOLTPEvents.dbo.Event.Envelope.after is <{'EventId': 1234566, 'CameraId': 2233, 'SiteId': 111, 'VehicleId': 45587, 'EventReviewStatusID': 10, 'EventTypeId': 123, 'EventDateTime': '2015-01-02T01:30:29Z', 'FTPUploadDateTime': 1420193330590, 'CAMFileName': 'XYZ', 'KeypadEntryCode': '0', 'IsActive': False, 'Flagged': False, 'EventTitle': None, 'CreatedBy': 1, 'CreatedDate': 1420191120730, 'ModifiedBy': 1, 'ModifiedDate': 1577871185680, 'ReReviewAnalysis': None, 'LegacyEventId': None, 'TripId': 3382, 'FileVersion': '2.2', 'EventNumber': 'AAAA-BBBB', 'Latitude': 'UU9elrA=', 'Longitude': '/ueZUeFw', 'GeoAddressId': None, 'ReviewedEventId': 129411077, 'VideoStatus': 4, 'PredictionImportance': 0.1402457539, 'FlaggedBy': None, 'FlaggedDate': None, 'TriggerTypeId': 322, 'VideoDeleteDate': 1422783120000, 'MetadataDeleteDate': 1577871120000, 'RetentionStatus': 15, 'PartnerTriggerId': None, 'CoachingStateId': 0, 'EventKudoHistoryId': None}> of type <class 'dict'> expected null",
  "CoreOLTPEvents.dbo.Event.Value.Latitude is <UU9elrA=> of type <class 'str'> expected null",
  "CoreOLTPEvents.dbo.Event.Value.Latitude is <UU9elrA=> of type <class 'str'> expected {'scale': 10, 'precision': 13, 'connect.version': 1, 'connect.parameters': {'scale': '10', 'connect.decimal.precision': '13'}, 'connect.name': 'org.apache.kafka.connect.data.Decimal', 'logicalType': 'decimal', 'type': 'bytes'}"
]

因此,这试图告诉我们存在 3 个潜在问题。第一个是 in 的值after不匹配null,但我们可以忽略它,因为我们不想after匹配null

后两个问题是实际问题。它说 的 值Latitude是字符串UU9elrA=,但不匹配nullor bytes。这里的字符串看起来是 base64 编码的,所以也许你有一些代码将它解码为字节,如果是这样,那么实际问题可能是其他问题,但如果是这样,那么我认为你应该能够用来fastavro.validate找出问题所在。

于 2021-03-23T12:27:25.693 回答