spring amqp-outbound网关从不同的thead生成回复(如jms-outbound网关)

问题陈述:

Spring amqp-outbound网关从不同的线程产生回复(类似于jms-outbound网关,具有不同的队列,使用相关键关联请求/响应)。

无法将此消息与此示例相关联。

Spring集成

                

配置

 @Bean public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){ final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory); template.setQueue("reply_queue"); return template; } @Bean public Binding binding(){ return BindingBuilder.bind(this.queue()).to(this.exchange()).with("request_exchange_queue"); } @Bean public DirectExchange exchange(){ return new DirectExchange("request_exchange"); } @Bean public Queue queue(){ return new Queue("request_queue", true, false, true); } @Bean public Binding bindingReply(){ return BindingBuilder.bind(this.queue()).to(this.exchange()).with("reply_exchange_queue"); } @Bean public DirectExchange exchangeReply(){ return new DirectExchange("reply_exchange"); } @Bean public Queue replyQueue(){ return new Queue("reply_queue", true, false, true); } 

服务

 @Service public final class OuboundService { public Message createRequest(String message){ System.out.println("Inside createRequest : "+ message); final String transactionId = UUID.randomUUID().toString(); final Message builtMessage = MessageBuilder.withBody(message.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setHeader(AmqpHeaders.CORRELATION_ID, transactionId) .build(); return builtMessage; } public Message processRequest(Message message){ System.out.println("Inside process Request : "+ new String(message.getBody())); System.out.println("Header values : "+message.getMessageProperties().getHeaders()); final Message result = MessageBuilder.withBody("Successful".getBytes()).copyProperties(message.getMessageProperties()) .copyHeaders(message.getMessageProperties().getHeaders()).build(); return result; } } 

错误:

org.springframework.integration.handler.ReplyRequiredException:处理程序’outboundGtwyId’没有产生回复,其’requiresReply’属性设置为true。

GitHub源代码(已解决的解决方案)

https://github.com/kingkongprab/spring-amqp-outbound-gateway

相关性也在Spring AMQP中完成。 有关详细信息,请参阅其RabbitTemplate#sendAndRecevie() 。 参考手册中也有关于此事的良好文档。

与其AbstractAmqpOutboundEndpointAmqpInboundGateway实现的Spring集成提供了开箱即用的请求 – 应答相关解决方案。 如果您无法在服务器端使用AmqpInboundGateway ,则应确保从接收的请求到回复的correlationId传输。 是的,您可以使用专用交换进行回复,这是RabbitTemplate#setQueue()支持等待客户端,出站端的回复。 但如果没有适当的correlation转移,这仍然无法奏效。 另请参阅https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-message-headers ,了解如何在Spring中映射标头(包括correlationId )的信息积分。

UPDATE

感谢您分享您的申请。

好吧,现在我看到几个问题:

  1. 你肯定错过了replyQueue绑定:

     @Bean public Binding bindingReply(){ return BindingBuilder.bind(this.replyQueue()).to(this.exchangeReply()).with("reply_exchange_queue"); } 
  2. RabbitTemplate必须使用setReplyAddress() 。 您必须为reply_queue配置MessageListenerContainer并将RabbitTemplate作为侦听器:

     @Bean public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){ final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory); template.setReplyAddress(replyQueue().getName()); return template; } @Bean public MessageListenerContainer replyContainer(RabbitTemplate template) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(template.getConnectionFactory()); container.setQueues(replyQueue()); container.setMessageListener(template); return container; } 
  3. 使用org.springframework.amqp.core.Message操作的OuboundService是无用的。 通道适配器不知道这种类型的payload ,您的自定义Message只是作为另一个org.springframework.amqp.core.Message的序列化body 。 我把它改成了这一切,一切运作良好:

     public String createRequest(String message){ System.out.println("Inside createRequest : "+ message); return message; } public Message processRequest(Message message){ System.out.println("Inside process Request : " + message); return message; } 

无论如何,我建议你重新考虑你的设计,然后回到AmqpInboundGateway

BTW在最终解决方案中,您不需要关心任何correlation 。 框架会自动为您完成。