0

我正在学习 Spring-Integration,并对 Gateway 和 Service-Activators 有基本的了解。我喜欢网关的概念。Spring Integration 在运行时为网关生成代理。此代理对网关的消费者隐藏所有消息传递细节。此外,生成的代理也可能关联请求和回复。

出于学习的目的,我开始使用原始 Spring Integration 功能而不是使用网关来实现请求和回复相关性。我可以在请求标头中设置相关标识符,但在接收通道回复时无法指定相关标识符。以下(在问题的末尾)是相同的代码片段。此外,相关内容如何针对消息代理(例如 RabbitMQ)起作用?RabbitMQ 是否提供了检索带有特定标头(相关标识符)的消息的能力?

public class RemoteProxyCalculatorService implements CalculatorService
{
    public int Square(int n) 
    {
        UUID uuid = SendRequest(n, "squareRequestChannel");
        int squareOfn = ReceiveReply("squareReplyChannel", uuid);
        return squareOfn;
    }

    private <T> UUID SendRequest(T payload, String requestChannel)
    {
        UUID requestID = UUID.randomUUID();
        Message<T> inputMessage = MessageBuilder.withPayload(payload)
                .setCorrelationId(requestID)
                .build();

        MessageChannel channel = (MessageChannel)context.getBean(requestChannel, MessageChannel.class);
        channel.send(inputMessage);
        return requestID;
    }

    @SuppressWarnings("unchecked")
    private <T> T ReceiveReply(String replyChannel, UUID requestID)
    {
        //How to consume requestID so as to receive only the reply related to the request posted by this thread
        PollableChannel channel = (PollableChannel)context.getBean(replyChannel);
        Message<?> groupMessage = channel.receive();
        return (T)groupMessage.getPayload();
    }

    private ClassPathXmlApplicationContext context;
}

谢谢。

4

3 回答 3

1

问题在于公共回复渠道。解决方案(Mark 建议类似)将如下所示。

public class RemoteProxyCalculatorService
{
  public int Square(int n)
  {
    PollableChannel replyChannel = SendRequest(n, "squareRequestChannel");
    int squareOfn = ReceiveReply(replyChannel);
    return squareOfn;
  }

  private <T> PollableChannel SendRequest(T payload, String requestChannel)
  {
    UUID requestID = UUID.randomUUID();
    QueueChannel replyQueueChannel = new QueueChannel();
    Message<T> inputMessage = MessageBuilder.withPayload(payload)
        .setCorrelationId(requestID)
        .setReplyChannel(replyQueueChannel)
        .build();
    MessageChannel channel = context.getBean(requestChannel, MessageChannel.class);
    channel.send(inputMessage);
    return replyQueueChannel;
  }

  @SuppressWarnings("unchecked")
  private <T> T ReceiveReply(PollableChannel replyChannel)
  {
    Message<?> groupMessage = replyChannel.receive();
    return (T) groupMessage.getPayload();
  }

  private ClassPathXmlApplicationContext context;
}
于 2013-07-12T04:58:26.000 回答
1

在应用程序中进行关联的最简单方法甚至不需要correlationId 标头。相反,您可以创建一个 QueueChannel 实例(您不共享该实例)并将其作为您发送的 Message 上的 replyChannel 标头提供。无论下游组件最终响应什么,它都会在 Message 中找到该标头。

关于 RabbitMQ,我们的出站网关只是应用了类似的技术,但使用了 AMQP 消息的 replyTo 属性。

希望有帮助。

-标记

于 2013-07-10T19:43:59.893 回答
0

如果您想使用通用回复渠道,那么我认为这就是您要寻找的。

public class RemoteProxyCalculatorService
{
  public int Square(int n)
  {
    PollableChannel replyChannel = SendRequest(n, "squareRequestChannel");
    int squareOfn = ReceiveReply(replyChannel);
    return squareOfn;
  }

  private <T> PollableChannel SendRequest(T payload, String requestChannel)
  {
    UUID requestID = UUID.randomUUID();
    Message<T> inputMessage = MessageBuilder.withPayload(payload)
        .setCorrelationId(requestID)
        .setReplyChannel(myMessageHandler.getSubscribedChannel())
        .build();

    // Create a Pollable channel for two things

    // 1. Pollable channel is where this thread should look for reply.
    QueueChannel replyQueueChannel = new QueueChannel();

    // 2. Message Handler will send reply to this Pollable channel once it receives the reply using correlation Id.
    myMessageHandler.add(requestID, replyQueueChannel);

    MessageChannel channel = context.getBean(requestChannel, MessageChannel.class);
    channel.send(inputMessage);

    return replyQueueChannel;
  }

  @SuppressWarnings("unchecked")
  private <T> T ReceiveReply(PollableChannel replyChannel)
  {
    Message<?> groupMessage = replyChannel.receive();
    return (T) groupMessage.getPayload();
  }

  private ClassPathXmlApplicationContext context;

  @Autowired
  private MyMessageHandler myMessageHandler;
}

/**
 * Message Handler
 * 
 */
public class MyMessageHandler implements MessageHandler
{
  private final Map<Object, MessageChannel> idChannelsMap = new TreeMap<>();
  private final Object lock = new Object();
  private final SubscribableChannel subscribedChannel;

  public MyMessageHandler(SubscribableChannel subscribedChannel)
  {
    this.subscribedChannel = subscribedChannel;
  }

  @Override
  public void handleMessage(Message<?> message) throws MessagingException
  {
    synchronized (lock)
    {
      this.idChannelsMap.get(message.getHeaders().getCorrelationId()).send(message);
      this.idChannelsMap.remove(message.getHeaders().getCorrelationId());
    }
  }

  public void add(Object correlationId, MessageChannel messageChannel)
  {
    synchronized (lock)
    {
      this.idChannelsMap.put(correlationId, messageChannel);
    }
  }

  public SubscribableChannel getSubscribedChannel()
  {
    return subscribedChannel;
  }

}
于 2013-07-12T09:13:29.280 回答