spring amqp-outbound网关从不同的thead生成回复(如jms-outbound网关)
问题陈述:
Spring amqp-outbound网关从不同的线程产生回复(类似于jms-outbound网关,具有不同的队列,使用相关键关联请求/响应)。
无法将此消息与此示例相关联。
Spring集成
配置
@Bean public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){ final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory); template.setQueue("reply_queue"); return template; } @Bean public Binding binding(){ return BindingBuilder.bind(this.queue()).to(this.exchange()).with("request_exchange_queue"); } @Bean public DirectExchange exchange(){ return new DirectExchange("request_exchange"); } @Bean public Queue queue(){ return new Queue("request_queue", true, false, true); } @Bean public Binding bindingReply(){ return BindingBuilder.bind(this.queue()).to(this.exchange()).with("reply_exchange_queue"); } @Bean public DirectExchange exchangeReply(){ return new DirectExchange("reply_exchange"); } @Bean public Queue replyQueue(){ return new Queue("reply_queue", true, false, true); }
服务
@Service public final class OuboundService { public Message createRequest(String message){ System.out.println("Inside createRequest : "+ message); final String transactionId = UUID.randomUUID().toString(); final Message builtMessage = MessageBuilder.withBody(message.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setHeader(AmqpHeaders.CORRELATION_ID, transactionId) .build(); return builtMessage; } public Message processRequest(Message message){ System.out.println("Inside process Request : "+ new String(message.getBody())); System.out.println("Header values : "+message.getMessageProperties().getHeaders()); final Message result = MessageBuilder.withBody("Successful".getBytes()).copyProperties(message.getMessageProperties()) .copyHeaders(message.getMessageProperties().getHeaders()).build(); return result; } }
错误:
org.springframework.integration.handler.ReplyRequiredException:处理程序’outboundGtwyId’没有产生回复,其’requiresReply’属性设置为true。
GitHub源代码(已解决的解决方案)
https://github.com/kingkongprab/spring-amqp-outbound-gateway
相关性也在Spring AMQP中完成。 有关详细信息,请参阅其RabbitTemplate#sendAndRecevie()
。 参考手册中也有关于此事的良好文档。
与其AbstractAmqpOutboundEndpoint
和AmqpInboundGateway
实现的Spring集成提供了开箱即用的请求 – 应答相关解决方案。 如果您无法在服务器端使用AmqpInboundGateway
,则应确保从接收的请求到回复的correlationId
传输。 是的,您可以使用专用交换进行回复,这是RabbitTemplate#setQueue()
支持等待客户端,出站端的回复。 但如果没有适当的correlation
转移,这仍然无法奏效。 另请参阅https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-message-headers ,了解如何在Spring中映射标头(包括correlationId
)的信息积分。
UPDATE
感谢您分享您的申请。
好吧,现在我看到几个问题:
-
你肯定错过了
replyQueue
绑定:@Bean public Binding bindingReply(){ return BindingBuilder.bind(this.replyQueue()).to(this.exchangeReply()).with("reply_exchange_queue"); }
-
RabbitTemplate
必须使用setReplyAddress()
。 您必须为reply_queue
配置MessageListenerContainer
并将RabbitTemplate
作为侦听器:@Bean public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){ final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory); template.setReplyAddress(replyQueue().getName()); return template; } @Bean public MessageListenerContainer replyContainer(RabbitTemplate template) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(template.getConnectionFactory()); container.setQueues(replyQueue()); container.setMessageListener(template); return container; }
-
使用
org.springframework.amqp.core.Message
操作的OuboundService
是无用的。 通道适配器不知道这种类型的payload
,您的自定义Message
只是作为另一个org.springframework.amqp.core.Message
的序列化body
。 我把它改成了这一切,一切运作良好:public String createRequest(String message){ System.out.println("Inside createRequest : "+ message); return message; } public Message processRequest(Message message){ System.out.println("Inside process Request : " + message); return message; }
无论如何,我建议你重新考虑你的设计,然后回到AmqpInboundGateway
。
BTW在最终解决方案中,您不需要关心任何correlation
。 框架会自动为您完成。