spring boot rabbitmq MappingJackson2MessageConverter自定义对象转换

我正在尝试使用spring boot创建一个简单的spring boot应用程序,它将“生成”消息发送到rabbitmq交换/队列,以及另一个“消耗”这些消息的示例spring boot应用程序。 所以我有两个应用程序(或微服务,如果你愿意)。 1)“生产者”微服务2)“消费者”微服务

“生产者”有2个域对象。 Foo和Bar应该转换为json并发送给rabbitmq。 “消费者”应该分别接收并将json消息转换为域Foo和Bar。 出于某种原因,我不能完成这个简单的任务。 关于这个的例子并不多。 对于消息转换器我想使用org.springframework.messaging.converter.MappingJackson2MessageConverter

这是我到目前为止:

生产者MICROSERVICE

package demo.producer; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.stereotype.Service; @SpringBootApplication public class ProducerApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } @Bean Queue queue() { return new Queue("queue", false); } @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("queue"); } @Bean public MappingJackson2MessageConverter jackson2Converter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); return converter; } @Autowired private Sender sender; @Override public void run(String... args) throws Exception { sender.sendToRabbitmq(new Foo(), new Bar()); } } @Service class Sender { @Autowired private RabbitMessagingTemplate rabbitMessagingTemplate; @Autowired private MappingJackson2MessageConverter mappingJackson2MessageConverter; public void sendToRabbitmq(final Foo foo, final Bar bar) { this.rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter); this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", foo); this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", bar); } } class Bar { public int age = 33; } class Foo { public String name = "gustavo"; } 

消费者微软公司

 package demo.consumer; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.stereotype.Service; @SpringBootApplication @EnableRabbit public class ConsumerApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } @Autowired private Receiver receiver; @Override public void run(String... args) throws Exception { } } @Service class Receiver { @RabbitListener(queues = "queue") public void receiveMessage(Foo foo) { System.out.println("Received "); } @RabbitListener(queues = "queue") public void receiveMessage(Bar bar) { System.out.println("Received "); } } class Foo { public String name; } class Bar { public int age; } 

这是我得到的例外:

  org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void demo.consumer.Receiver.receiveMessage(demo.consumer.Bar)] Bean [demo.consumer.Receiver@1672fe87] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:116) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:83) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:170) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1257) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:83) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1119) at java.lang.Thread.run(Thread.java:745) Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message ... 13 common frames omitted Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found to convert to class demo.consumer.Bar, message=GenericMessage [payload=byte[10], headers={amqp_receivedRoutingKey=queue, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=queue, amqp_redelivered=false, id=87cf7e06-a78a-ddc1-71f5-c55066b46b11, amqp_consumerTag=amq.ctag-msWSwB4bYGWVO2diWSAHlw, contentType=application/json;charset=UTF-8, timestamp=1433989934574}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:77) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:127) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:100) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:113) ... 12 common frames omitted 

例外情况说没有转换器,这是真的,我的问题是我不知道如何在消费者端设置MappingJackson2MessageConverter转换器(请注意我想使用org.springframework.messaging.converter.MappingJackson2MessageConverter而不是org.springframework.amqp.support.converter.JsonMessageConverter

有什么想法吗 ?

以防万一,您可以在以下url分享此示例项目: https : //github.com/gustavoorsi/rabbitmq-consumer-receiver

好的,我终于有了这个工作。

Spring使用PayloadArgumentResolver提取,转换并将转换后的消息设置为使用@RabbitListener注释的方法参数。 不知何故,我们需要将mappingJackson2MessageConverter设置为此对象。

因此,在CONSUMER应用程序中,我们需要实现RabbitListenerConfigurer 。 通过重写configureRabbitListeners(RabbitListenerEndpointRegistrar注册器)我们可以设置一个自定义DefaultMessageHandlerMethodFactory ,我们设置了这个工厂的消息转换器,并且工厂将使用正确的convert创建我们的PayloadArgumentResolver

这是代码片段,我还更新了git项目 。

ConsumerApplication.java

 package demo.consumer; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.stereotype.Service; @SpringBootApplication @EnableRabbit public class ConsumerApplication implements RabbitListenerConfigurer { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } @Bean public MappingJackson2MessageConverter jackson2Converter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); return converter; } @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(jackson2Converter()); return factory; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } @Autowired private Receiver receiver; } @Service class Receiver { @RabbitListener(queues = "queue") public void receiveMessage(Foo foo) { System.out.println("Received <" + foo.name + ">"); } @RabbitListener(queues = "queue") public void receiveMessage(Bar bar) { System.out.println("Received <" + bar.age + ">"); } } class Foo { public String name; } class Bar { public int age; } 

因此,如果您运行Producer微服务,它将在队列中添加2条消息。 一个表示Foo对象,另一个表示Bar对象。 通过运行消费者微服务,您将看到两者都被Receiver类中的相应方法使用。


更新的问题:

我认为有一个关于从我这边排队的概念性问题。 通过声明使用@RabbitListener注释的两个指向同一队列的方法,我无法实现我想要实现的目标。 上述解决方案无法正常工作。 如果你发送给rabbitmq,比如6个Foo消息和3个Bar消息,那么听众就不会用Foo参数接收6次。 似乎并行调用了侦听器,因此无法根据方法参数类型区分调用哪个侦听器。 我的解决方案(我不确定这是否是最好的方式,我对这里的建议持开放态度)是为每个实体创建一个队列。 所以现在,我有queue.barqueue.foo ,并更新@RabbitListener(queues =“queue.foo”)我再次更新了代码,你可以在我的git存储库中查看它。

我自己没有这样做,但似乎你需要通过设置RabbitTemplate注册适当的转换。 请参阅Spring文档中的 3.6.2节。 我知道它是使用AMQP类配置的,但如果您提到的消息类是兼容的,则没有理由不能替换它。 看起来这个参考文献解释了如何使用Java配置而不是XML来实现它。 我没有真正使用过Rabbit,所以我没有任何个人经历,但我很乐意听到你的发现。