我可以插入数据,这是 Kafka 中的 avro 模式吗?
我想从主题中选择记录,然后过滤航班(例如:考虑两个记录具有相同的航班号。我们只需要通过考虑 Avro 模式中提到的时间戳来选择最新的一个
我该怎么做我想删除相同航班号的重复项
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "oldsomething random" }
输出流应该是这样的,
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
builder.stream(inputTopic, Consumed.with(Serdes.String(), flightDataSerde))
.map((k, v) -> new KeyValue<>((String) v.getFlightStatus(), (Integer) v.getFlightNumber()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
// Apply COUNT method
.count()
// Write to stream specified by outputTopic
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
阿夫罗:
"namespace": "io.confluent.developer.avro",
"type": "record",
"name": "FlightData",
"fields": [
{"name": "FlightNumber", "type": "int"},
{"name": "OriginAirport", "type": "string"},
{"name": "DestinationAirport", "type": "string"},
{"name": "OriginDate", "type": "string"},
{"name": "OriginTime", "type": "string"},
{"name": "DestinationDate", "type": "string"},
{"name": "DestinationTime", "type": "string"},
{"name": "FlightStatus", "type": "string"},
{"name": "GateOut", "type": "string"},
{"name": "GateIn", "type": "string"},
{"name": "RecordDateTime", "type": "string"}
]
}