0

我可以插入数据,这是 Kafka 中的 avro 模式吗?

我想从主题中选择记录,然后过滤航班(例如:考虑两个记录具有相同的航班号。我们只需要通过考虑 Avro 模式中提到的时间戳来选择最新的一个

我该怎么做我想删除相同航班号的重复项

{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "oldsomething random" }

输出流应该是这样的,

{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
 builder.stream(inputTopic, Consumed.with(Serdes.String(), flightDataSerde))
    
        .map((k, v) -> new KeyValue<>((String) v.getFlightStatus(), (Integer) v.getFlightNumber()))
    
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
        // Apply COUNT method
      .count()
        // Write to stream specified by outputTopic
        .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

阿夫罗:

  "namespace": "io.confluent.developer.avro",
  "type": "record",
  "name": "FlightData",
  "fields": [
    {"name": "FlightNumber", "type": "int"},
    {"name": "OriginAirport", "type": "string"},
    {"name": "DestinationAirport", "type": "string"},
        {"name": "OriginDate", "type": "string"},
        {"name": "OriginTime", "type": "string"},
        {"name": "DestinationDate", "type": "string"},
        {"name": "DestinationTime", "type": "string"},
        {"name": "FlightStatus", "type": "string"},

        {"name": "GateOut", "type": "string"},
        {"name": "GateIn", "type": "string"},
        {"name": "RecordDateTime", "type": "string"}
  ]
}
4

2 回答 2

0

您需要解决的主要问题是,您希望在发出结果记录之前等待多长时间。当您获得第一条记录时,您不知道是否可以立即发出它,也不知道以后是否可能存在重复(时间戳更大或更小)。

因此,您需要定义一些窗口并使用每个键和每个窗口只保留一条记录的聚合。在此聚合中,您可以比较时间戳并仅保留所需的记录。

聚合后,您可以使用suppress()仅在窗口关闭时发出一条最终结果记录。

于 2021-01-03T22:02:34.010 回答
-1

通过考虑 Avro 模式中提到的时间戳

这就是 TimestampExtractor 接口的用途。否则,您可以调整上游生产者以使该时间戳成为实际记录时间戳

两条记录具有相同的航班号。我们只需要选择最新的一个

这是到达源主题的相同键的有序记录的默认行为。不过,您将需要考虑处理迟到数据的逻辑,并跳过任何具有较晚时间戳的数据。这可以通过处理器 API 比 Streams DSL 更容易完成,无论如何您都需要使用它来访问以检查表内容

于 2020-07-27T14:03:49.033 回答