我正在使用 Samza 处理来自 Kafka 主题的消息。某些消息在将来带有时间戳,我想将处理推迟到该时间戳之后。与此同时,我想继续处理其他传入的消息。
我试图做的是让我Task
的消息排队并实现WindowableTask
定期检查消息,如果它们的时间戳允许处理它们。基本思想如下所示:
public class MyTask implements StreamTask, WindowableTask {
private HashSet<MyMessage> waitingMessages = new HashSet<>();
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
byte[] message = (byte[]) incomingMessageEnvelope.getMessage();
MyMessage parsedMessage = MyMessage.parseFrom(message);
if (parsedMessage.getValidFromDateTime().isBeforeNow()) {
// Do the processing
} else {
waitingMessages.add(parsedMessage);
}
}
@Override
public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
for (MyMessage message : waitingMessages) {
if (message.getValidFromDateTime().isBeforeNow()) {
// Do the processing and remove the message from the set
}
}
}
}
这显然有一些缺点。当我重新部署任务时,我会丢失内存中的等待消息。所以我想知道使用 Samza 延迟处理消息的最佳实践。我是否需要一次又一次地将消息重新发送到同一主题,直到我最终可以处理它们?我们在这里谈论将处理延迟几分钟,最多 1-2 小时。