spring amqp rabbitmq MessageListener无效
我想使用spring amqp使用rabbitmq,下面是我的配置。
这是一个简单的Message Listener类,
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class ImportMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println("consumer output: " + message); } }
这是生产者(spring批次的itemWriter),
public class ImportItemWriter implements ItemWriter { private AmqpTemplate template; public AmqpTemplate getTemplate() { return template; } public void setTemplate(AmqpTemplate template) { this.template = template; } public void write(List items) throws Exception { for (T item : items) { Object reply = template.convertSendAndReceive(item.toString()); System.out.println("producer output: " + reply); } } }
当我运行spring批处理作业时,将调用ImportItemWriter.write。 但是ImportMessageListener.onMessage不起作用。 它不打印消息。 我在控制台上获得所有项目的输出
producer output: null producer output: null producer output: null producer output: null producer output: null producer output: null producer output: null
您的消费者没有发送结果……
@Override public void onMessage(Message message) { System.out.println("consumer output: " + message); }
将其更改为简单的POJO; 容器的MessageListenerAdapter
将为您处理转换,并发送结果。
@Override public String handleMessage(String message) { System.out.println("consumer output: " + message); return "result"; }
编辑:
您还没有设置任何交换或路由到您的队列。 如果要使用默认交换/路由,请使用…
convertSendAndReceive("", queueName, item.toString());
EDIT2:
或者,将模板上的routingKey
设置为队列名称,然后您可以使用更简单的方法。
...sendAndReceive()
方法用于请求/回复方案,因此需要阻塞。 要异步执行,必须使用其中一个...send()
方法,并连接自己的SimpleListenerContainer
以接收回复; 你必须做自己的关联。 使用
public void convertAndSend(Object message, MessagePostProcessor postProcessor)
并在您的消息发布处理器中,设置replyTo
和correlationId
标头…
message.getMessageProperties().setReplyTo("foo"); message.getMessageProperties().setCorrelationId("bar");
或者,自己构建Message
对象(例如,使用MessageBuilder
)并使用send
方法…
template.send(MessageBuilder.withBody("foo".getBytes()) .setReplyTo("bar") .setCorrelationId("baz".getBytes()) .build());
每个请求都需要一个唯一的correlationId
以便您可以关联响应。