spring amqp rabbitmq MessageListener无效

我想使用spring amqp使用rabbitmq,下面是我的配置。

        

这是一个简单的Message Listener类,

 import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class ImportMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println("consumer output: " + message); } } 

这是生产者(spring批次的itemWriter),

 public class ImportItemWriter implements ItemWriter { private AmqpTemplate template; public AmqpTemplate getTemplate() { return template; } public void setTemplate(AmqpTemplate template) { this.template = template; } public void write(List items) throws Exception { for (T item : items) { Object reply = template.convertSendAndReceive(item.toString()); System.out.println("producer output: " + reply); } } } 

当我运行spring批处理作业时,将调用ImportItemWriter.write。 但是ImportMessageListener.onMessage不起作用。 它不打印消息。 我在控制台上获得所有项目的输出

 producer output: null producer output: null producer output: null producer output: null producer output: null producer output: null producer output: null 

您的消费者没有发送结果……

 @Override public void onMessage(Message message) { System.out.println("consumer output: " + message); } 

将其更改为简单的POJO; 容器的MessageListenerAdapter将为您处理转换,并发送结果。

 @Override public String handleMessage(String message) { System.out.println("consumer output: " + message); return "result"; } 

编辑:

您还没有设置任何交换或路由到您的队列。 如果要使用默认交换/路由,请使用…

 convertSendAndReceive("", queueName, item.toString()); 

EDIT2:

或者,将模板上的routingKey设置为队列名称,然后您可以使用更简单的方法。

...sendAndReceive()方法用于请求/回复方案,因此需要阻塞。 要异步执行,必须使用其中一个...send()方法,并连接自己的SimpleListenerContainer以接收回复; 你必须做自己的关联。 使用

 public void convertAndSend(Object message, MessagePostProcessor postProcessor) 

并在您的消息发布处理器中,设置replyTocorrelationId标头…

 message.getMessageProperties().setReplyTo("foo"); message.getMessageProperties().setCorrelationId("bar"); 

或者,自己构建Message对象(例如,使用MessageBuilder )并使用send方法…

 template.send(MessageBuilder.withBody("foo".getBytes()) .setReplyTo("bar") .setCorrelationId("baz".getBytes()) .build()); 

每个请求都需要一个唯一的correlationId以便您可以关联响应。