0

如果仅问题表明缺乏知识,我们深表歉意。我有一个单一的来源,我已经根据处理的事件中某个键的值(通过侧面输出)进行了拆分。3/4 唯一事件类型有一个密钥,我可以使用它来确保它们属于同一个完整事务,但第 4 个事件只能通过某些时间戳标准配对 - 可能有 n > 1 个这样的事件需要配对。这是食品订单交付系统的示例;

Event1: {'order_id' : xxx, 'event_type' : 'order_confirmed', 'ts' : IS8601} 
Event2: {'order_id' : xxx, 'vehicle_number' : yyy, 'event_type' : 'order_picked_up_by_driver', 'ts' : IS8601} 
**Event3a: {'vehicle_number' : xxx ,'event_type' : 'driver_reached_checkpoint', 'ts' : IS8601} 
**Event3b: {'vehicle_number' : xxx, 'event_type' : 'driver_reached_checkpoint', 'ts' : IS8601 + ~1 hour} 
Event4: {'order_id' : xxx, 'event_type' : 'order_delivered', 'ts' : IS8601 + ~1 hour} 

请注意事件 1,2 和 4 如何具有 order_id 但 3a, 3b 没有(尽管可以与事件 2 配对)。目前的计划是通过 order_id 将事件 1、2、4 一起键入并存储状态以进行交付,直到已知 3a、3b 是唯一符合 Event2 和 4s 时间戳范围内的事件。由于车辆编号与 order_id 是一对多的关系,因此每当我看到新事件 2 并返回前一个订单的所有事件以进行进一步处理时,我都需要更新驾驶员的状态以指向新订单。

即使它是面包屑,真的可以使用一些方向来解决我需要解决的问题。Flink 和流媒体的新手。

--- 编辑 ---

我想存储每个订单的状态,一旦我看到每个事件类型的实例(在事件 3 的情况下是多个),将这些事件合并在一起,在输出到另一个源和接收器之前计算一些统计数据。我不能使用键控状态运算符,因为事件类型 3 没有可用的键“order_id”。我可以放心地假设事件是有序的。

示例输出,让 et = 事件类型

'order_completed_ts' : <et4>, 'number_of_checkpoints_en_route_to_completion' : <et3>}

不同的订单顺序可以有多个相同类型的事件。

样本

1. {'order_id' : 1, 'event_type' : 'order_confirmed', 'ts' : IS8601}
2. {'order_id' : 2, 'event_type' : 'order_confirmed', 'ts' : IS8601}
3. {'order_id' : 3, 'event_type' : 'order_confirmed', 'ts' : IS8601}
4. {'order_id' : 4, 'event_type' : 'order_confirmed', 'ts' : IS8601}
5. {'order_id' : 1, 'event_type' : 'order_picked_up_by_driver', 'ts' : IS8601} 
  • 请注意示例元素 5 如何包含与元素 1 相同的订单 ID
4

1 回答 1

0

正如我所看到的,您希望管道的第一阶段将 order_ids 与没有它们的事件相关联,这将使其余的分析更加直接。

如果您使用 Flink SQL,那么这将是一个临时表连接,您可以在vehicle_number. 您可以通过这样的方式执行此操作,即每个driver_reached_checkpoint事件都将与该order_picked_up_by_driver事件之前的最新事件相结合driver_reached_checkpoint

或者要使用 DataStream API 执行此操作,您可以通过 对流进行键控vehicle_number,并实现KeyedProcessFunction以键控状态记住最近order_picked_up_by_driver事件的 a。然后,它可以将order_id来自该事件的事件添加到driver_reached_checkpoint它看到的每个事件中。

一旦你有一个order_id关于每个事件的流,那么你可以按order_id(或 GROUP BY,如果你使用 SQL)作为键。

于 2020-06-15T08:41:08.123 回答