1

我有一个浮夸的应用程序,它有两个主题。第一个接收格式的原始数据:

# Input of first topic
{
    timestamp: 2021-08-24 05:35:24,
    value: 40,
    device_id: ABC
}

一个异步函数使用interpolate这个主题并在每整整 15 分钟计算一次值。为此,我使用一个表来存储device_idABC 的最后一次出现。所以可以说看到的最后一条消息是

{
    timestamp: 2021-08-24 04:55:44,
    value: 30,
    device_id: ABC
}

那么我将执行以下操作:

table = app.Table(f'last_msg_store',
                    default=lambda: None)

def interpolate_values(last_msg, cur_msg):
    ...

@app.agent(source_topic)
async def interpolate(msgs):
    async for msg in msgs:
        device_id = get_key(msg)

        last_msg = table[device_id]

        if last_msg is None:
            # table is empty
            table[device_id] = msg
            continue

        timestamps, values = interpolate_values(last_msg, msg)
        
        print(timestamps)  # prints ['2021-08-24 05:00:00', '2021-08-24 05:15:00', '2021-08-24 05:30:00']
        print(values)  # prints [31.09, 34.94, 38.79]

在另一个主题中target_topic,我想按时间顺序计算值的增量。为此,我创建了另一个帮助表,它再次存储最后一次出现,分组在device_id. 我现在的问题是:如何确保事件/消息的顺序没有改变,以便我的增量计算正确?

我目前的方法如下所示:

@app.agent(source_topic)
async def interpolate(msgs):
    # ... same as before ...
    for timestamp, value in zip(timestamps, values):
        new_msg = generate_new_message(timestamp, value, device_id)
        await target_topic.send(value=new_msg)

@app.agent(target_topic)
async def delta(msgs):
    async for msg in msgs.group_by(get_key, name='delta_key'):
        device_id = get_key(msg)

        last_msg = delta_table[device_id]

        if last_msg is None:
            # table is empty
            delta_table[device_id] = msg
            continue

        delta_value = msg.value - last_msg.value
        delta_timestamps = msg.timestamp - last_msg.timestamp

        # ... further processing of the data

当然,我希望主题插入的顺序应该与出现在下一个主题中的消息相同,但并非在每种情况下都是如此。

以下是我看到日志中出现的消息的方式:

# Sending order is correct
[2021-09-02 12:48:55,684] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 01:15:00Z', value=...>
[2021-09-02 12:48:55,685] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 01:30:00Z', value=...>
[2021-09-02 12:48:55,690] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 01:45:00Z', value=...>
[2021-09-02 12:48:55,691] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 02:00:00Z', value=...>
[2021-09-02 12:48:55,693] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 02:15:00Z', value=...>
[2021-09-02 12:48:55,694] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 02:30:00Z', value=...>
[2021-09-02 12:48:55,696] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 02:45:00Z', value=...>
[2021-09-02 12:48:55,698] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 03:00:00Z', value=...>
[2021-09-02 12:48:55,700] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 03:15:00Z', value=...>
[2021-09-02 12:48:55,704] Forwarding to delta_topic: <Msg: timestamp='2021-08-24 03:30:00Z', value=...>

# Appearance order is messed up. There are sent to the same partition because of ".group_by(...)".
[2021-09-02 12:48:56,007] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T01:15:00Z', value=...>
[2021-09-02 12:48:56,017] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T03:00:00Z', value=...>
[2021-09-02 12:48:56,020] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T02:30:00Z', value=...>
[2021-09-02 12:48:56,021] Negative/No DURATION detected -1800.0!
[2021-09-02 12:48:56,080] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T03:30:00Z', value=...>
[2021-09-02 12:48:56,092] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T01:45:00Z', value=...>
[2021-09-02 12:48:56,093] Negative/No DURATION detected -6300.0!
[2021-09-02 12:48:56,096] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T02:45:00Z', value=...>
[2021-09-02 12:48:56,098] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T01:30:00Z', value=...>
[2021-09-02 12:48:56,099] Negative/No DURATION detected -4500.0!
[2021-09-02 12:48:56,100] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T02:00:00Z', value=...>
[2021-09-02 12:48:56,104] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T02:15:00Z', value=...>
[2021-09-02 12:48:56,107] Partition: 4. Msg received: <Msg: timestamp='2021-08-24T03:15:00Z', value=...>
4

0 回答 0