我不知道 Samza 中对此有任何本机支持,但我可以想象一个使用 WindowableTask 的解决方法。
public class PaymentEvent implements Comparable<PaymentEvent> {
// if current time > timestamp, payment is stuck
public long timestamp;
// we want a corresponding PaymentFailed... event with the same id
public long interactionId;
// PaymentRequest, PaymentAborted, PaymentSucceeded...
public enum type;
...
@Override
public int compareTo(PaymentEvent o){
return timestamp - o.timestamp;
}
}
现在在您的流程方法中,您将拥有类似的内容:
PriorityQueue<PaymentEvent> pqueue;
Map<Long, PaymentEvent> responses;
public void process(...) {
PaymentEvent e = new PaymentEvent(envelope.getMessage());
if (e.enum == PAYMENT_REQUEST) {
pqueue.add(e);
} else {
responses.put(e.interactionId, e);
}
}
最后,在您的窗口中,您将从优先级队列中弹出所有内容timestamp > current time
并检查地图中是否有相应的事件。
public void window(...) {
while(pqueue.peek().timestamp <= currentTime) {
if (!map.containsKey(pqueue.poll().interactionId) {
// send the trigger via the collector
}
}
}
最后,您可以将配置中的窗口时间设置为您想要轮询的时间。配置是task.window.ms
.