我正在尝试实现简单的服务,从 kafka 中提取消息,将它们包装在一些数据中并发送到外部服务。
处理消息时处理外部服务不可用的常见模式是什么?
到目前为止,我仅在对外部服务的请求成功时才手动提交消息。如果消息未提交,我希望 kafka 在一段时间后重新发送消息,以便处理外部服务失败对消费者来说是透明的。我找不到办法做到这一点。但是,如果我不做一些反模式并且有更好的解决方案,我很好奇。
我正在尝试实现简单的服务,从 kafka 中提取消息,将它们包装在一些数据中并发送到外部服务。
处理消息时处理外部服务不可用的常见模式是什么?
到目前为止,我仅在对外部服务的请求成功时才手动提交消息。如果消息未提交,我希望 kafka 在一段时间后重新发送消息,以便处理外部服务失败对消费者来说是透明的。我找不到办法做到这一点。但是,如果我不做一些反模式并且有更好的解决方案,我很好奇。
首先你需要考虑,Kafka 是基于拉的。因此,如果您想第二次接收消息,您需要seek()
将其偏移量和poll()
.
此外,如果您想停止处理消息,您可以对它们进行pause()
分区和稍后处理resume()
。请参阅Consumer JavaDoc 中的“消费流控制”部分: https ://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
因此,如果您的外部服务已关闭,只需暂停并等待它恢复即可。