我有以下 PHP 应用程序。这会将用户注册发布到消息队列。Java 应用程序从该队列中读取并导入它。希望下面的图表能够描述它。我只处理 Java 方面的事情。json 消息已经存在于队列中。
路线(Java 消费方)。
@Component
public class SignUpRouting {
errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage());
from("rabbitmq://phpSignUp.exchange?username=etc....")
.routeId("signUpRoute")
.processRef("signUpProcessor")
.end();
//....
处理器..
@Component
public class SignupProcessor implements Processor {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void process(Exchange exchange) throws Exception {
String json = exchange.getIn().getBody(String.class);
SignUpDto dto = mapper.readValue(json, SignUpDto.class);
SignUp signUp = new SignUp();
signUp.setWhatever(dto.getWhatever());
//etc....
// save record
signUpDao.save(signUp);
}
}
我的问题是这个..当处理器无法导入消息时我该怎么办。
例如,假设有一个 DAO 异常。数据字段可能过长或导入格式不正确。我不想丢失消息。我想查看错误并重试导入。但我不想每 30 秒重试一次消息。
我在想我需要创建另一个队列..一个死信队列并无限期地每 6 小时重试一次消息?.. 然后我会查看日志以查看错误并上传修复程序,然后重新处理消息?
我将如何实施?还是我走错了路?
编辑 我尝试设置 deadLetterExchange 以查看是否可以使事情朝着正确的方向发展...但是它出错并说队列不能为非空
rabbitmq://phpSignUp.exchange?username=etc...&deadLetterExchange=signUpDeadLetter.exchange