0

我的用例是实时识别 X 时间后未收到预期事件的实体。

例如:如果我们在时间 T 收到了 PaymentInitiated 事件,但在 T+X 时没有收到 PaymentFailed / PaymentAborted / PaymentSucedded 中的任何一个,则引发一个触发器,说明 PaymentStuck 以及 PaymentIntitiated 事件的详细信息。

我如何在 Apache Samza 中为此类用例建模,因为它在每个事件上滚动时间段 X,而不是固定时间间隔。

谢谢, 哈里什

4

1 回答 1

0

我不知道 Samza 中对此有任何本机支持,但我可以想象一个使用 WindowableTask 的解决方法。

public class PaymentEvent implements Comparable<PaymentEvent> {
    // if current time > timestamp, payment is stuck
    public long timestamp; 
    // we want a corresponding PaymentFailed... event with the same id
    public long interactionId; 
    // PaymentRequest, PaymentAborted, PaymentSucceeded...
    public enum type;
    ...

    @Override
    public int compareTo(PaymentEvent o){
        return timestamp - o.timestamp;
    }
}

现在在您的流程方法中,您将拥有类似的内容:

PriorityQueue<PaymentEvent> pqueue;
Map<Long, PaymentEvent> responses;

public void process(...) {
    PaymentEvent e = new PaymentEvent(envelope.getMessage());
    if (e.enum == PAYMENT_REQUEST) {
        pqueue.add(e);
    } else {
        responses.put(e.interactionId, e);
    }
}

最后,在您的窗口中,您将从优先级队列中弹出所有内容timestamp > current time并检查地图中是否有相应的事件。

public void window(...) {
    while(pqueue.peek().timestamp <= currentTime) {
        if (!map.containsKey(pqueue.poll().interactionId) {
            // send the trigger via the collector
        } 
    }
}

最后,您可以将配置中的窗口时间设置为您想要轮询的时间。配置是task.window.ms.

于 2015-10-21T22:16:15.373 回答