0

很长一段时间以来,我们一直在使用 spring 集成核心和 spring 集成 amqp 以及 rabbitm-mq。我们的服务还通过使用 x-death 标头和 amaqp-expiration 标头使用死字机制。它曾经可以正常工作,直到我们决定升级 spring-integration 的版本。

以前的版本:5.0.6.RELEASE

新版本:5.2.4.RELEASE

以前版本中的 Rabbit mq 标头

    MessageProperties [headers={x-first-death-exchange=dms.arkona.exchange, x-death=[{reason=expired, original-expiration=5000, count=1, exchange=dms.arkona.exchange, time=Mon Feb 14 18:03:11 PST 2022, routing-keys=[push.customer.arkona.controller.update.wait.key], queue=push.customer.arkona.controller.update.wait}], x-first-death-reason=expired, x-first-death-queue=push.customer.arkona.controller.update.wait}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.arkona.exchange, receivedRoutingKey=push.customer.arkona.controller.update.key, deliveryTag=2407, consumerTag=amq.ctag-1_bYuEGts-Rfk6fqDyIqBw, consumerQueue=push.customer.arkona.controller.update]), amqp_expiration=10000, id=b77509b2-da63-1be0-ffd8-b96cbdc12c89, timestamp=1644890591793}

新版本中的兔子 mq 标头

   headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dms.transformer.exchange, amqp_deliveryTag=7875, amqp_consumerQueue=pull.parts.transformer.controller, amqp_redelivered=false, amqp_receivedRoutingKey=pull.parts.transformer.controller.key, x-first-death-exchange=dms.transformer.exchange, amqp_timestamp=Wed Feb 16 22:13:14 PST 2022, amqp_messageId=0df5a50e-9c15-ad1d-036d-a98993d2fa92, x-death=[{reason=expired, original-expiration=40000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:13:54 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, id=8c7b4d13-5a20-b430-05fc-492ed98971fc, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, </CounterTicketSearchResponseSearchResult><CounterTicketSearchResponseSearchResult><TicketNumber>161696</TicketNumber><SaleType>R</SaleType><PriceLevel>1</PriceLevel> requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
           <Dealer>
              <EnterpriseCode>OTIM</EnterpriseCode>
              <CompanyNumber>OT2</CompanyNumber>
              <ServerName>itrack7.arkona.com</ServerName>
           </Dealer>
           <LookupParms>
               <TicketNumber>161696</TicketNumber>
           </LookupParms>
  </CounterTicketLookup>
  , messageXml=<RepairOrderWrap>
     <Dealer>
        <DealerID/>
        <DealerName/>
        <DealerDMS>
           <DealerDMSID/>
           <DMSName/>
           <UserName/>
           <Password/>
           <HostUrl/>
           <EnterpriseCode/>
           <CompanyNumber/>
           <ServerName/>
        </DealerDMS>
     </Dealer>
     <RepairOrder>
        <ID/>
        <OrderNumber>161696</OrderNumber>
        <OrderType>PO</OrderType>
        <Amount>227.90</Amount>
        <OrderDate>20220117 00:00:00</OrderDate>
        <CloseDate/>
        <PrintDate>20220117 00:00:00</PrintDate>
        <DealerAssociateID/>
        <AssociateDMSID>870</AssociateDMSID>
        <Description/>
        <OrderStatus>P</OrderStatus>
        <NumberOfInvoices/>
        <ReadyROData/>
        <IsPaid/>
        <PaidAmount/>
        <IsPaidInKaarma/>
        <IsPaymentRequestSent/>
        <Tag/>
        <MileageText/>
        <InvoiceUrl/>
     </RepairOrder>
     <CustWrap>
        <CAttrs>
           <IsBusiness/>
           <AssignedSA/>
           <Comments/>
        </CAttrs>
        <IdentityAttrs>
           <Attr value="SSN"/>
           <Attr value="DriverLicense"/>
           <Attr value="BirthDate"/>
           <Attr value="Gender"/>
           <Attr value="Language"/>
        </IdentityAttrs>
        <CustPref>
           <Pref value=""/>
           <Pref value="text">Y</Pref>
           <Pref value="email">Y</Pref>
           <Pref value="call">Y</Pref>
           <Pref value="phone"/>
           <Pref value="postal">Y</Pref>
        </CustPref>
        <PreferredComm/>
        
             
  </RepairOrderWrap>
  , dms=ARK, original_expiration=10000}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=20000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:13:14 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null<InvoiceNumber/><PurchaseOrderNumber/><Total>139.76</Total></CounterTicketSearchResponseSearchResult><CounterTicketSearchResponseSearchResult><TicketNumber>161703</TicketNumber><CounterPersonID>054</CounterPersonID><CustomerKey>1090627</CustomerKey><CustName>EURO MOTORS</CustName><CustPhoneNo>7179200375</CustPhoneNo><SaleType>W</SaleType><PriceLevel>15</PriceLevel><OpenOrTranDate>20220117</OpenOrTranDate><InvoiceNumber/><PurchaseOrderNumber>T093945</PurchaseOrderNumber><Total>471.56</Total></CounterTicketSearchResponseSearchResult></Results></CounterTicketSearchResult></CounterTicketSearchResponse>, lastPullTime=2022-02-16T18:12:39Z, requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
           <Dealer>
              <EnterpriseCode>OTIM</EnterpriseCode>
              <CompanyNumber>OT2</CompanyNumber>
              <ServerName>itrack7.arkona.com</ServerName>
           </Dealer>
           <LookupParms>
               <TicketNumber>161696</TicketNumber>
           </LookupParms>
  </CounterTicketLookup>
  , messageXml=<RepairOrderWrap>
     <Dealer>
        <DealerID/>
        <DealerName/>
        <DealerDMS>
           <DealerDMSID/>
           <DMSName/>
           <UserName/>
           <Password/>
           <HostUrl/>
           <EnterpriseCode/>
           <CompanyNumber/>
           <ServerName/>
        </DealerDMS>
     </Dealer>
     <RepairOrder>
        <ID/>
        <OrderNumber>161696</OrderNumber>
        <OrderType>PO</OrderType>
        <Amount>227.90</Amount>
        <OrderDate>20220117 00:00:00</OrderDate>
        <CloseDate/>
        <PrintDate>20220117 00:00:00</PrintDate>
        <DealerAssociateID/>
        <AssociateDMSID>870</AssociateDMSID>
        <Description/>
        <OrderStatus>P</OrderStatus>
        <NumberOfInvoices/>
        <ReadyROData/>
        <IsPaid/>
        <PaidAmount/>
        <IsPaidInKaarma/>
        <IsPaymentRequestSent/>
        <Tag/>
        <MileageText/>
        <InvoiceUrl/>
     </RepairOrder>
     <CustWrap>
        <CAttrs>
           <IsBusiness/>
           <AssignedSA/>
           <Comments/>
        </CAttrs>
        <IdentityAttrs>
           <Attr value="SSN"/>
           <Attr value="DriverLicense"/>
           <Attr value="BirthDate"/>
           <Attr value="Gender"/>
           <Attr value="Language"/>
        </IdentityAttrs>
        <CustPref>
           <Pref value=""/>
           <Pref value="text">Y</Pref>
           <Pref value="email">Y</Pref>
           <Pref value="call">Y</Pref>
           <Pref value="phone"/>
           <Pref value="postal">Y</Pref>
        </CustPref>
        <PreferredComm/>
        <Customer>
           <ID/>
           <Version/>
           <CustomerKey>1115218</CustomerKey>
           <TypeCode/>
           <CustomerType/>
           <DealerID/>
           <FName>DORIS</FName>
           <MName>J</MName>
           <LName>KAUFFMAN</LName>
           <ImageUrl/>
           <Indexed/>
           <Valid/>
           <OptedOut/>
           <UpdatedBy/>
           <CreatedBy/>
           <Communications>
              <Communication>
                 <ID/>
                 <Version/>
                 <Type>P</Type>
                 <Value>7174681776</Value>
                 <CreatedBy/>
                 <Preferred/>
                 <Valid>1</Valid>
                 <Label>ct_phone</Label>
                 <Desc/>
                 <UpdatedBy/>
                 <CommPrefs/>
              </Communication>
           </Communications>
        </Customer>
        <LastModified/>
        <StatusCode/>
        <StatusMessage/>
     </CustWrap>
  </RepairOrderWrap>
  , dms=ARK, original_expiration=5000}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:12:54 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
           <Dealer>
              <EnterpriseCode>OTIM</EnterpriseCode>
              <CompanyNumber>OT2</CompanyNumber>
              <ServerName>itrack7.arkona.com</ServerName>
           </Dealer>
           <LookupParms>
               <TicketNumber>161696</TicketNumber>
           </LookupParms>
  </CounterTicketLookup>
  , messageXml=<RepairOrderWrap>
     <Dealer>
        <DealerID/>
        <DealerName/>
        <DealerDMS>
           <DealerDMSID/>
           <DMSName/>
           <UserName/>
           <Password/>
           <HostUrl/>
           <EnterpriseCode/>
           <CompanyNumber/>
           <ServerName/>
        </DealerDMS>
     </Dealer>
     <RepairOrder>
        <ID/>
        <OrderNumber>161696</OrderNumber>
        <OrderType>PO</OrderType>
        <Amount>227.90</Amount>
        <OrderDate>20220117 00:00:00</OrderDate>
        <CloseDate/>
        <PrintDate>20220117 00:00:00</PrintDate>
        <DealerAssociateID/>
        <AssociateDMSID>870</AssociateDMSID>
        <Description/>
        <OrderStatus>P</OrderStatus>
        <NumberOfInvoices/>
        <ReadyROData/>
        <IsPaid/>
        <PaidAmount/>
        <IsPaidInKaarma/>
        <IsPaymentRequestSent/>
        <Tag/>
        <MileageText/>
        <InvoiceUrl/>
     </RepairOrder>
     <CustWrap>
        <CAttrs>
           <IsBusiness/>
           <AssignedSA/>
           <Comments/>
        </CAttrs>
        <IdentityAttrs>
           <Attr value="SSN"/>
           <Attr value="DriverLicense"/>
           <Attr value="BirthDate"/>
           <Attr value="Gender"/>
           <Attr value="Language"/>
        </IdentityAttrs>
      
        <Customer>
           <ID/>
           <Version/>
           <CustomerKey>1115218</CustomerKey>
           <TypeCode/>
           <CustomerType/>
           <DealerID/>
           <FName>DORIS</FName>
           <MName>J</MName>
           <LName>KAUFFMAN</LName>
           <ImageUrl/>
           <Indexed/>
           <Valid/>
           <OptedOut/>
           <UpdatedBy/>
           <CreatedBy/>
           <Communications>
              <Communication>
                 <ID/>
                 <Version/>
                 <Type>P</Type>
                 <Value>7174681776</Value>
                 <CreatedBy/>
                
        <LastModified/>
        <StatusCode/>
        <StatusMessage/>
     </CustWrap>
  </RepairOrderWrap>
  , dms=ARK, original_expiration=null}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=5000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:12:44 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, previousResponseXml=<?contentType=application/x-java-serialized-object, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7869, consumerTag=amq.ctag-dZLHnCtvYWzpTvPM4ASzjA, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:39 PST 2022, messageId=2da8985b-9c80-240b-fb29-7294035750f7, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7873, consumerTag=amq.ctag-Fw8OPUahC_jNd--kZIdtSg, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:44 PST 2022, messageId=066c4289-5a25-8459-8571-8078421b68d5, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7872, consumerTag=amq.ctag-GYy8qHuHqhZ1s3PyFv5VJA, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:54 PST 2022, messageId=5935a433-f7fa-a2c9-f782-d15b4235c91c, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7873, consumerTag=amq.ctag-h6izVs7ziHOH93-M_CBhvw, consumerQueue=pull.parts.transformer.controller]), amqp_consumerTag=amq.ctag-WdkMr705y_K0iJUOnbGTug, contentType=application/x-java-serialized-object, timestamp=1645078434806}]
  ```
  
  Notice that how spring-integration/spring-integration amqp is adding message body to header and other headers each time the message is passed to dead letter queue
  
  Can someone pls guide us to any setting where in we can disable this repetitive addition of headers . As it is causing the following error in our rabbit mq

caused by: java.lang.IllegalArgumentException: Content headers exceeded max frame size: 132603 > 131072

基本上消息头的大小变得如此之大以至于超出了限制。在 5.0.6 版本中不会发生

更新 :

我们放置了一个自定义键,例如 original-expiration,它的值来自 x-death 标头

public void processFailedMessage(MessageHeaders messageHeaders, HashMap<String,Object> message)
    {
        if (messageHeaders.containsKey("x-death")) {
            List<HashMap<String, Object>> deathList = (List<HashMap<String, Object>>) messageHeaders
                    .get("x-death");
            //logger.debug(message.get("messageId")+" "+deathList);
            if (deathList.size() > 0) {
                HashMap<String, Object> death = deathList.get(0);
                if (death.containsKey("original-expiration")) {
                    message.put("original_expiration", (String) death.get("original-expiration"));
                    logger.info("original-expiration = "+death.get("original-expiration"));
                } 
            } 
        } else {
            message.put("original_expiration", null);
        }
    }

然后我们使用 header-enricher 来丰富 amqp-expiration 标头

public String updateExpiration(HashMap<String, Object> message)
    {
        //logger.debug(message.get("messageId")+" original_expiration = "+(String) message.get("original_expiration")+" initialexpiration = "+this.initialexpiration+" multiplier = "+this.multiplier+" maximumretries = "+this.maximumretries);
        Integer newExpiration = null;
        if(message.get("original_expiration") == null )
        {
            newExpiration = this.initialexpiration;
        
        }
        else
        {   
            double x = Math.log((double)(Integer.parseInt((String) message.get("original_expiration"))/this.initialexpiration));
            double y = Math.log((double)this.multiplier);
            long retryCount = Math.round(x/y);
            logger.info(message.get("messageId")+" "+x+" "+y+" Retried "+(retryCount+1)+" of "+this.maximumretries);
            if((retryCount + 1) >= this.maximumretries)
            {
                newExpiration = null;
            }
            else
            {
                newExpiration = Integer.parseInt((String) message.get("original_expiration"))*this.multiplier;
            }
        }
        logger.info(" NewExpiration = "+(newExpiration!=null?String.valueOf(newExpiration):null));
        return newExpiration!=null?String.valueOf(newExpiration):null;
    }




UPDATE : 

For dlq logic after updating the expiration header we send to the message to a router where it heck if the value of amqp expiration is not empty or empty. Based on that it sends the message either to a dead letter queue or a failed queuue

<rabbit:queue
        name="pull.parts.transformer.controller.wait">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange"
                value="dms.transformer.exchange" />
            <entry key="x-dead-letter-routing-key"
                value="pull.parts.transformer.controller.key" />
        </rabbit:queue-arguments>
    </rabbit:queue>


@Router(inputChannel="toPullPartsOrderErrorMessageRouter")
    public String processPartsOrderFailedMessageExpiration(@Payload HashMap<String,Object> message, @Headers MessageHeaders messageHeaders) 
    {
        logger.info("{} {}",message.get("messageId"),messageHeaders);
        if(messageHeaders.containsKey("amqp_expiration"))
        {
            String expiration = (String) messageHeaders.get("amqp_expiration");
            //logger.debug("{} {}",message.get("messageId"),expiration);
            if(expiration==null || expiration.isEmpty() || expiration.equals("null"))
            {
                return FromPullPartsOrderErrorMessageRouterFailed;
            }
            return FromPullPartsOrderErrorMessageRouterWait;
            
        }
        else
        {
            return FromPullPartsOrderErrorMessageRouterFailed;
        }
    }


<int:channel id="PullPartsChannel"/>

<int:service-activator input-channel="PullPartsChannel" 
        ref="syncer" method="onPartsOrderRequestRecievedFromDMS"/>
       

       

<int-amqp:inbound-channel-adapter channel="PullPartsChannel"
         error-channel = "errorChannelPullParts"  queue-names="pull.parts.transformer.controller" concurrent-consumers="${partsorderpullthreads}" 
         connection-factory="rabbitConnectionFactory" header-mapper="syncerHeaderMapper" prefetch-count="${partsorderpullfetchcount}" />
        
<int:channel id="fromPullPartsErrorHandler"/>
<int:service-activator id="errorHandlerPullParts" input-channel="errorChannelPullParts" output-channel="fromPullPartsErrorHandler" 
        ref="errorhelper" method="onErrorInPullParts" />
<int:header-enricher input-channel="fromPullPartsErrorHandler" output-channel="toPullPartsOrderErrorMessageRouter">
               <int:header name="amqp_expiration" method="updateExpiration" ref="errorhelper"/>
</int:header-enricher>      
<!-- look at  com.kaarma.syncer.utility.FailedMessageRouter -->
<int:channel id="fromPullPartsOrderErrorMessageRouterWait"/>        
<int-amqp:outbound-channel-adapter
        channel="fromPullPartsOrderErrorMessageRouterWait" amqp-template="rabbitTemplate" exchange-name="dms.transformer.exchange"
        routing-key="pull.parts.transformer.controller.wait.key" /> 
<int:channel id="fromPullPartsOrderErrorMessageRouterFailed"/>      
<int-amqp:outbound-channel-adapter
        channel="fromPullPartsOrderErrorMessageRouterFailed" amqp-template="rabbitTemplate" exchange-name="dms.transformer.exchange"
        routing-key="pull.parts.transformer.controller.failed.key" />

        

4

2 回答 2

1

这对我有用

@Bean
public DefaultAmqpHeaderMapper outboundMapper() {
    DefaultAmqpHeaderMapper amqpHeaderMapper = DefaultAmqpHeaderMapper.outboundMapper();
    amqpHeaderMapper.setRequestHeaderNames(DefaultAmqpHeaderMapper.STANDARD_REQUEST_HEADER_NAME_PATTERN);
    amqpHeaderMapper.setReplyHeaderNames(DefaultAmqpHeaderMapper.STANDARD_REPLY_HEADER_NAME_PATTERN);
    return amqpHeaderMapper;
}

我在 outbound-channel-adapter 中使用了这个 bean,同时将消息从 spring 集成通道发送到 rmq

于 2022-02-28T10:11:01.207 回答
0

您可以配置<int-amqp:outbound-channel-adapter>为仅映射您感兴趣的那些标头。默认情况下,它映射所有:

private static String[] safeOutboundHeaders() {
    return new String[] { "!x-*", "*" };
}

请参阅mapped-request-headers属性和文档:https ://docs.spring.io/spring-integration/docs/current/reference/html/amqp.html#amqp-message-headers 。

目前尚不清楚在您发送到此 AMQP 通道适配器的消息上没有显示自定义标头之前是什么。仅仅因为“复制所有标题”逻辑一直存在。

于 2022-02-22T17:18:29.290 回答