使用Spring AMQP接收和发送Java对象

我想实现Spring AMQP示例,用于使用侦听器发送和接收Java对象。 我试过这个:

发送Java对象

ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); AmqpAdmin admin = new RabbitAdmin(connectionFactory); admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)).to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION)); AmqpTemplate template = new RabbitTemplate(connectionFactory); TransactionsBean obj = new TransactionsBean(); obj.setId(Long.valueOf(111222333)); 

接收并发送回另一个Java对象:

 ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); AmqpAdmin admin = new RabbitAdmin(connectionFactory); admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)) .to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION)); AmqpTemplate template = new RabbitTemplate(connectionFactory); TransactionsBean obj = (TransactionsBean) template.receiveAndConvert(QUEUE_PROCESSING_TRANSACTION); System.out.println(" !!!!!!! Received id " + obj.getTransaction_id()); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueues(new Queue(QUEUE_PROCESSING_TRANSACTION, false)); container.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // Receive here Java object and send back another object } }); 

你能告诉我如何在没有复杂注释的情况下扩展代码只是简单的监听器吗?

最简单的方法是使用@RabbitListener – 使用Spring Boot时更容易,因为他将连接基础架构bean(模板,管理员等)。

 @SpringBootApplication public class So51009346Application { public static final String QUEUE_PROCESSING_TRANSACTION = "q1"; public static void main(String[] args) { SpringApplication.run(So51009346Application.class, args); } @Bean public ApplicationRunner runner(RabbitTemplate template) { return args -> { ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject()); System.out.println(reply); }; } @Bean public Queue queue() { return new Queue(QUEUE_PROCESSING_TRANSACTION); } @Bean public TopicExchange te() { return new TopicExchange("ex"); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(te()).with("rk"); } } class RequestObject implements Serializable { private static final long serialVersionUID = 1L; } class ReplyObject implements Serializable { private static final long serialVersionUID = 1L; } @Component class Listener { @RabbitListener(queues = So51009346Application.QUEUE_PROCESSING_TRANSACTION) public ReplyObject process(RequestObject ro) { return new ReplyObject(); } } 

如果由于某种原因不想使用该注释,可以使用MessageListenerAdapter连接容器…

 @SpringBootApplication public class So51009346Application { public static final String QUEUE_PROCESSING_TRANSACTION = "q1"; public static void main(String[] args) { SpringApplication.run(So51009346Application.class, args); } @Bean public ApplicationRunner runner(RabbitTemplate template) { return args -> { ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject()); System.out.println(reply); }; } @Bean public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf); container.setQueueNames(QUEUE_PROCESSING_TRANSACTION); container.setMessageListener(new MessageListenerAdapter(listener, "process")); return container; } @Bean public Queue queue() { return new Queue(QUEUE_PROCESSING_TRANSACTION); } @Bean public TopicExchange te() { return new TopicExchange("ex"); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(te()).with("rk"); } } class RequestObject implements Serializable { private static final long serialVersionUID = 1L; } class ReplyObject implements Serializable { private static final long serialVersionUID = 1L; } @Component class Listener { public ReplyObject process(RequestObject ro) { return new ReplyObject(); } } 

当然,您可以使用适配器自己连接容器,就像在您的问题中一样,但通常最好让Spring将其作为@Bean管理,否则您将错过一些function(例如,故障事件发布,空闲容器) 。 适配器获取对您的请求/回复侦听器的引用以及要调用的方法名称。