很长一段时间以来,我们一直在使用 spring 集成核心和 spring 集成 amqp 以及 rabbitm-mq。我们的服务还通过使用 x-death 标头和 amaqp-expiration 标头使用死字机制。它曾经可以正常工作,直到我们决定升级 spring-integration 的版本。
以前的版本:5.0.6.RELEASE
新版本:5.2.4.RELEASE
以前版本中的 Rabbit mq 标头
MessageProperties [headers={x-first-death-exchange=dms.arkona.exchange, x-death=[{reason=expired, original-expiration=5000, count=1, exchange=dms.arkona.exchange, time=Mon Feb 14 18:03:11 PST 2022, routing-keys=[push.customer.arkona.controller.update.wait.key], queue=push.customer.arkona.controller.update.wait}], x-first-death-reason=expired, x-first-death-queue=push.customer.arkona.controller.update.wait}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.arkona.exchange, receivedRoutingKey=push.customer.arkona.controller.update.key, deliveryTag=2407, consumerTag=amq.ctag-1_bYuEGts-Rfk6fqDyIqBw, consumerQueue=push.customer.arkona.controller.update]), amqp_expiration=10000, id=b77509b2-da63-1be0-ffd8-b96cbdc12c89, timestamp=1644890591793}
新版本中的兔子 mq 标头
headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dms.transformer.exchange, amqp_deliveryTag=7875, amqp_consumerQueue=pull.parts.transformer.controller, amqp_redelivered=false, amqp_receivedRoutingKey=pull.parts.transformer.controller.key, x-first-death-exchange=dms.transformer.exchange, amqp_timestamp=Wed Feb 16 22:13:14 PST 2022, amqp_messageId=0df5a50e-9c15-ad1d-036d-a98993d2fa92, x-death=[{reason=expired, original-expiration=40000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:13:54 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, id=8c7b4d13-5a20-b430-05fc-492ed98971fc, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, </CounterTicketSearchResponseSearchResult><CounterTicketSearchResponseSearchResult><TicketNumber>161696</TicketNumber><SaleType>R</SaleType><PriceLevel>1</PriceLevel> requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
<Dealer>
<EnterpriseCode>OTIM</EnterpriseCode>
<CompanyNumber>OT2</CompanyNumber>
<ServerName>itrack7.arkona.com</ServerName>
</Dealer>
<LookupParms>
<TicketNumber>161696</TicketNumber>
</LookupParms>
</CounterTicketLookup>
, messageXml=<RepairOrderWrap>
<Dealer>
<DealerID/>
<DealerName/>
<DealerDMS>
<DealerDMSID/>
<DMSName/>
<UserName/>
<Password/>
<HostUrl/>
<EnterpriseCode/>
<CompanyNumber/>
<ServerName/>
</DealerDMS>
</Dealer>
<RepairOrder>
<ID/>
<OrderNumber>161696</OrderNumber>
<OrderType>PO</OrderType>
<Amount>227.90</Amount>
<OrderDate>20220117 00:00:00</OrderDate>
<CloseDate/>
<PrintDate>20220117 00:00:00</PrintDate>
<DealerAssociateID/>
<AssociateDMSID>870</AssociateDMSID>
<Description/>
<OrderStatus>P</OrderStatus>
<NumberOfInvoices/>
<ReadyROData/>
<IsPaid/>
<PaidAmount/>
<IsPaidInKaarma/>
<IsPaymentRequestSent/>
<Tag/>
<MileageText/>
<InvoiceUrl/>
</RepairOrder>
<CustWrap>
<CAttrs>
<IsBusiness/>
<AssignedSA/>
<Comments/>
</CAttrs>
<IdentityAttrs>
<Attr value="SSN"/>
<Attr value="DriverLicense"/>
<Attr value="BirthDate"/>
<Attr value="Gender"/>
<Attr value="Language"/>
</IdentityAttrs>
<CustPref>
<Pref value=""/>
<Pref value="text">Y</Pref>
<Pref value="email">Y</Pref>
<Pref value="call">Y</Pref>
<Pref value="phone"/>
<Pref value="postal">Y</Pref>
</CustPref>
<PreferredComm/>
</RepairOrderWrap>
, dms=ARK, original_expiration=10000}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=20000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:13:14 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null<InvoiceNumber/><PurchaseOrderNumber/><Total>139.76</Total></CounterTicketSearchResponseSearchResult><CounterTicketSearchResponseSearchResult><TicketNumber>161703</TicketNumber><CounterPersonID>054</CounterPersonID><CustomerKey>1090627</CustomerKey><CustName>EURO MOTORS</CustName><CustPhoneNo>7179200375</CustPhoneNo><SaleType>W</SaleType><PriceLevel>15</PriceLevel><OpenOrTranDate>20220117</OpenOrTranDate><InvoiceNumber/><PurchaseOrderNumber>T093945</PurchaseOrderNumber><Total>471.56</Total></CounterTicketSearchResponseSearchResult></Results></CounterTicketSearchResult></CounterTicketSearchResponse>, lastPullTime=2022-02-16T18:12:39Z, requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
<Dealer>
<EnterpriseCode>OTIM</EnterpriseCode>
<CompanyNumber>OT2</CompanyNumber>
<ServerName>itrack7.arkona.com</ServerName>
</Dealer>
<LookupParms>
<TicketNumber>161696</TicketNumber>
</LookupParms>
</CounterTicketLookup>
, messageXml=<RepairOrderWrap>
<Dealer>
<DealerID/>
<DealerName/>
<DealerDMS>
<DealerDMSID/>
<DMSName/>
<UserName/>
<Password/>
<HostUrl/>
<EnterpriseCode/>
<CompanyNumber/>
<ServerName/>
</DealerDMS>
</Dealer>
<RepairOrder>
<ID/>
<OrderNumber>161696</OrderNumber>
<OrderType>PO</OrderType>
<Amount>227.90</Amount>
<OrderDate>20220117 00:00:00</OrderDate>
<CloseDate/>
<PrintDate>20220117 00:00:00</PrintDate>
<DealerAssociateID/>
<AssociateDMSID>870</AssociateDMSID>
<Description/>
<OrderStatus>P</OrderStatus>
<NumberOfInvoices/>
<ReadyROData/>
<IsPaid/>
<PaidAmount/>
<IsPaidInKaarma/>
<IsPaymentRequestSent/>
<Tag/>
<MileageText/>
<InvoiceUrl/>
</RepairOrder>
<CustWrap>
<CAttrs>
<IsBusiness/>
<AssignedSA/>
<Comments/>
</CAttrs>
<IdentityAttrs>
<Attr value="SSN"/>
<Attr value="DriverLicense"/>
<Attr value="BirthDate"/>
<Attr value="Gender"/>
<Attr value="Language"/>
</IdentityAttrs>
<CustPref>
<Pref value=""/>
<Pref value="text">Y</Pref>
<Pref value="email">Y</Pref>
<Pref value="call">Y</Pref>
<Pref value="phone"/>
<Pref value="postal">Y</Pref>
</CustPref>
<PreferredComm/>
<Customer>
<ID/>
<Version/>
<CustomerKey>1115218</CustomerKey>
<TypeCode/>
<CustomerType/>
<DealerID/>
<FName>DORIS</FName>
<MName>J</MName>
<LName>KAUFFMAN</LName>
<ImageUrl/>
<Indexed/>
<Valid/>
<OptedOut/>
<UpdatedBy/>
<CreatedBy/>
<Communications>
<Communication>
<ID/>
<Version/>
<Type>P</Type>
<Value>7174681776</Value>
<CreatedBy/>
<Preferred/>
<Valid>1</Valid>
<Label>ct_phone</Label>
<Desc/>
<UpdatedBy/>
<CommPrefs/>
</Communication>
</Communications>
</Customer>
<LastModified/>
<StatusCode/>
<StatusMessage/>
</CustWrap>
</RepairOrderWrap>
, dms=ARK, original_expiration=5000}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:12:54 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
<Dealer>
<EnterpriseCode>OTIM</EnterpriseCode>
<CompanyNumber>OT2</CompanyNumber>
<ServerName>itrack7.arkona.com</ServerName>
</Dealer>
<LookupParms>
<TicketNumber>161696</TicketNumber>
</LookupParms>
</CounterTicketLookup>
, messageXml=<RepairOrderWrap>
<Dealer>
<DealerID/>
<DealerName/>
<DealerDMS>
<DealerDMSID/>
<DMSName/>
<UserName/>
<Password/>
<HostUrl/>
<EnterpriseCode/>
<CompanyNumber/>
<ServerName/>
</DealerDMS>
</Dealer>
<RepairOrder>
<ID/>
<OrderNumber>161696</OrderNumber>
<OrderType>PO</OrderType>
<Amount>227.90</Amount>
<OrderDate>20220117 00:00:00</OrderDate>
<CloseDate/>
<PrintDate>20220117 00:00:00</PrintDate>
<DealerAssociateID/>
<AssociateDMSID>870</AssociateDMSID>
<Description/>
<OrderStatus>P</OrderStatus>
<NumberOfInvoices/>
<ReadyROData/>
<IsPaid/>
<PaidAmount/>
<IsPaidInKaarma/>
<IsPaymentRequestSent/>
<Tag/>
<MileageText/>
<InvoiceUrl/>
</RepairOrder>
<CustWrap>
<CAttrs>
<IsBusiness/>
<AssignedSA/>
<Comments/>
</CAttrs>
<IdentityAttrs>
<Attr value="SSN"/>
<Attr value="DriverLicense"/>
<Attr value="BirthDate"/>
<Attr value="Gender"/>
<Attr value="Language"/>
</IdentityAttrs>
<Customer>
<ID/>
<Version/>
<CustomerKey>1115218</CustomerKey>
<TypeCode/>
<CustomerType/>
<DealerID/>
<FName>DORIS</FName>
<MName>J</MName>
<LName>KAUFFMAN</LName>
<ImageUrl/>
<Indexed/>
<Valid/>
<OptedOut/>
<UpdatedBy/>
<CreatedBy/>
<Communications>
<Communication>
<ID/>
<Version/>
<Type>P</Type>
<Value>7174681776</Value>
<CreatedBy/>
<LastModified/>
<StatusCode/>
<StatusMessage/>
</CustWrap>
</RepairOrderWrap>
, dms=ARK, original_expiration=null}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=5000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:12:44 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, previousResponseXml=<?contentType=application/x-java-serialized-object, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7869, consumerTag=amq.ctag-dZLHnCtvYWzpTvPM4ASzjA, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:39 PST 2022, messageId=2da8985b-9c80-240b-fb29-7294035750f7, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7873, consumerTag=amq.ctag-Fw8OPUahC_jNd--kZIdtSg, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:44 PST 2022, messageId=066c4289-5a25-8459-8571-8078421b68d5, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7872, consumerTag=amq.ctag-GYy8qHuHqhZ1s3PyFv5VJA, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:54 PST 2022, messageId=5935a433-f7fa-a2c9-f782-d15b4235c91c, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7873, consumerTag=amq.ctag-h6izVs7ziHOH93-M_CBhvw, consumerQueue=pull.parts.transformer.controller]), amqp_consumerTag=amq.ctag-WdkMr705y_K0iJUOnbGTug, contentType=application/x-java-serialized-object, timestamp=1645078434806}]
```
Notice that how spring-integration/spring-integration amqp is adding message body to header and other headers each time the message is passed to dead letter queue
Can someone pls guide us to any setting where in we can disable this repetitive addition of headers . As it is causing the following error in our rabbit mq
caused by: java.lang.IllegalArgumentException: Content headers exceeded max frame size: 132603 > 131072
基本上消息头的大小变得如此之大以至于超出了限制。在 5.0.6 版本中不会发生
更新 :
我们放置了一个自定义键,例如 original-expiration,它的值来自 x-death 标头
public void processFailedMessage(MessageHeaders messageHeaders, HashMap<String,Object> message)
{
if (messageHeaders.containsKey("x-death")) {
List<HashMap<String, Object>> deathList = (List<HashMap<String, Object>>) messageHeaders
.get("x-death");
//logger.debug(message.get("messageId")+" "+deathList);
if (deathList.size() > 0) {
HashMap<String, Object> death = deathList.get(0);
if (death.containsKey("original-expiration")) {
message.put("original_expiration", (String) death.get("original-expiration"));
logger.info("original-expiration = "+death.get("original-expiration"));
}
}
} else {
message.put("original_expiration", null);
}
}
然后我们使用 header-enricher 来丰富 amqp-expiration 标头
public String updateExpiration(HashMap<String, Object> message)
{
//logger.debug(message.get("messageId")+" original_expiration = "+(String) message.get("original_expiration")+" initialexpiration = "+this.initialexpiration+" multiplier = "+this.multiplier+" maximumretries = "+this.maximumretries);
Integer newExpiration = null;
if(message.get("original_expiration") == null )
{
newExpiration = this.initialexpiration;
}
else
{
double x = Math.log((double)(Integer.parseInt((String) message.get("original_expiration"))/this.initialexpiration));
double y = Math.log((double)this.multiplier);
long retryCount = Math.round(x/y);
logger.info(message.get("messageId")+" "+x+" "+y+" Retried "+(retryCount+1)+" of "+this.maximumretries);
if((retryCount + 1) >= this.maximumretries)
{
newExpiration = null;
}
else
{
newExpiration = Integer.parseInt((String) message.get("original_expiration"))*this.multiplier;
}
}
logger.info(" NewExpiration = "+(newExpiration!=null?String.valueOf(newExpiration):null));
return newExpiration!=null?String.valueOf(newExpiration):null;
}
UPDATE :
For dlq logic after updating the expiration header we send to the message to a router where it heck if the value of amqp expiration is not empty or empty. Based on that it sends the message either to a dead letter queue or a failed queuue
<rabbit:queue
name="pull.parts.transformer.controller.wait">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange"
value="dms.transformer.exchange" />
<entry key="x-dead-letter-routing-key"
value="pull.parts.transformer.controller.key" />
</rabbit:queue-arguments>
</rabbit:queue>
@Router(inputChannel="toPullPartsOrderErrorMessageRouter")
public String processPartsOrderFailedMessageExpiration(@Payload HashMap<String,Object> message, @Headers MessageHeaders messageHeaders)
{
logger.info("{} {}",message.get("messageId"),messageHeaders);
if(messageHeaders.containsKey("amqp_expiration"))
{
String expiration = (String) messageHeaders.get("amqp_expiration");
//logger.debug("{} {}",message.get("messageId"),expiration);
if(expiration==null || expiration.isEmpty() || expiration.equals("null"))
{
return FromPullPartsOrderErrorMessageRouterFailed;
}
return FromPullPartsOrderErrorMessageRouterWait;
}
else
{
return FromPullPartsOrderErrorMessageRouterFailed;
}
}
<int:channel id="PullPartsChannel"/>
<int:service-activator input-channel="PullPartsChannel"
ref="syncer" method="onPartsOrderRequestRecievedFromDMS"/>
<int-amqp:inbound-channel-adapter channel="PullPartsChannel"
error-channel = "errorChannelPullParts" queue-names="pull.parts.transformer.controller" concurrent-consumers="${partsorderpullthreads}"
connection-factory="rabbitConnectionFactory" header-mapper="syncerHeaderMapper" prefetch-count="${partsorderpullfetchcount}" />
<int:channel id="fromPullPartsErrorHandler"/>
<int:service-activator id="errorHandlerPullParts" input-channel="errorChannelPullParts" output-channel="fromPullPartsErrorHandler"
ref="errorhelper" method="onErrorInPullParts" />
<int:header-enricher input-channel="fromPullPartsErrorHandler" output-channel="toPullPartsOrderErrorMessageRouter">
<int:header name="amqp_expiration" method="updateExpiration" ref="errorhelper"/>
</int:header-enricher>
<!-- look at com.kaarma.syncer.utility.FailedMessageRouter -->
<int:channel id="fromPullPartsOrderErrorMessageRouterWait"/>
<int-amqp:outbound-channel-adapter
channel="fromPullPartsOrderErrorMessageRouterWait" amqp-template="rabbitTemplate" exchange-name="dms.transformer.exchange"
routing-key="pull.parts.transformer.controller.wait.key" />
<int:channel id="fromPullPartsOrderErrorMessageRouterFailed"/>
<int-amqp:outbound-channel-adapter
channel="fromPullPartsOrderErrorMessageRouterFailed" amqp-template="rabbitTemplate" exchange-name="dms.transformer.exchange"
routing-key="pull.parts.transformer.controller.failed.key" />