我有一个浮夸的应用程序,它有两个主题。第一个接收格式的原始数据:
# Input of first topic
{
timestamp: 2021-08-24 05:35:24,
value: 40,
device_id: ABC
}
一个异步函数使用interpolate
这个主题并在每整整 15 分钟计算一次值。为此,我使用一个表来存储device_id
ABC 的最后一次出现。所以可以说看到的最后一条消息是
{
timestamp: 2021-08-24 04:55:44,
value: 30,
device_id: ABC
}
那么我将执行以下操作:
table = app.Table(f'last_msg_store',
default=lambda: None)
def interpolate_values(last_msg, cur_msg):
...
@app.agent(source_topic)
async def interpolate(msgs):
async for msg in msgs:
device_id = get_key(msg)
last_msg = table[device_id]
if last_msg is None:
# table is empty
table[device_id] = msg
continue
timestamps, values = interpolate_values(last_msg, msg)
print(timestamps) # prints ['2021-08-24 05:00:00', '2021-08-24 05:15:00', '2021-08-24 05:30:00']
print(values) # prints [31.09, 34.94, 38.79]
在另一个主题中target_topic
,我想按时间顺序计算值的增量。为此,我创建了另一个帮助表,它再次存储最后一次出现,分组在device_id
. 我现在的问题是:如何确保事件/消息的顺序没有改变,以便我的增量计算正确?
我目前的方法如下所示:
@app.agent(source_topic)
async def interpolate(msgs):
# ... same as before ...
for timestamp, value in zip(timestamps, values):
new_msg = generate_new_message(timestamp, value, device_id)
await target_topic.send(value=new_msg)
@app.agent(target_topic)
async def delta(msgs):
async for msg in msgs.group_by(get_key, name='delta_key'):
device_id = get_key(msg)
last_msg = delta_table[device_id]
if last_msg is None:
# table is empty
delta_table[device_id] = msg
continue
delta_value = msg.value - last_msg.value
delta_timestamps = msg.timestamp - last_msg.timestamp
# ... further processing of the data
当然,我希望主题插入的顺序应该与出现在下一个主题中的消息相同,但并非在每种情况下都是如此。
以下是我看到日志中出现的消息的方式:
# Sending order is correct
[2021-09-02 12:48:55,684] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 01:15:00Z', value=...>
[2021-09-02 12:48:55,685] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 01:30:00Z', value=...>
[2021-09-02 12:48:55,690] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 01:45:00Z', value=...>
[2021-09-02 12:48:55,691] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 02:00:00Z', value=...>
[2021-09-02 12:48:55,693] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 02:15:00Z', value=...>
[2021-09-02 12:48:55,694] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 02:30:00Z', value=...>
[2021-09-02 12:48:55,696] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 02:45:00Z', value=...>
[2021-09-02 12:48:55,698] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 03:00:00Z', value=...>
[2021-09-02 12:48:55,700] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 03:15:00Z', value=...>
[2021-09-02 12:48:55,704] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 03:30:00Z', value=...>
# Appearance order is messed up. There are sent to the same partition because of ".group_by(...)".
[2021-09-02 12:48:56,007] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T01:15:00Z', value=...>
[2021-09-02 12:48:56,017] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T03:00:00Z', value=...>
[2021-09-02 12:48:56,020] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T02:30:00Z', value=...>
[2021-09-02 12:48:56,021] Negative/No DURATION detected -1800.0!
[2021-09-02 12:48:56,080] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T03:30:00Z', value=...>
[2021-09-02 12:48:56,092] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T01:45:00Z', value=...>
[2021-09-02 12:48:56,093] Negative/No DURATION detected -6300.0!
[2021-09-02 12:48:56,096] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T02:45:00Z', value=...>
[2021-09-02 12:48:56,098] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T01:30:00Z', value=...>
[2021-09-02 12:48:56,099] Negative/No DURATION detected -4500.0!
[2021-09-02 12:48:56,100] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T02:00:00Z', value=...>
[2021-09-02 12:48:56,104] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T02:15:00Z', value=...>
[2021-09-02 12:48:56,107] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T03:15:00Z', value=...>