Group在RabbitMQ中收到消息,最好使用Spring AMQP?

我正在接收来自服务(S)的消息,该服务将每个单独的属性更改作为单独的消息发布到实体。 一个人为的例子是这样的实体:

Person { id: 123 name: "Something", address: {...} } 

如果在同一事务中更新了名称和地址,则(S)将发布两条消息, PersonNameCorrectedPersonMoved 。 问题出在接收方,我正在存储此Person实体的投影,每个属性更改都会导致写入数据库。 因此,在这个例子中,将有两次写入数据库,但如果我可以在短时间内批量处理消息并按id分组,那么我只需要对数据库进行一次写入。

如何在RabbitMQ中处理这个问题? Spring AMQP是否提供了更简单的抽象?

请注意,我已经简要介绍了预取,但我不确定这是否可行。 如果我理解正确的话,预取也是基于连接的。 我试图在每个队列的基础上实现这一点,因为如果批处理(因此增加了延迟)是要走的路,我不想将这种延迟添加到我的服务所消耗的所有队列中(但仅限于那些需要“group-by-id”function)。

预取对于这样的情况无济于事。

考虑使用Spring Integration ,它具有位于Spring AMQP之上的适配器; 它还提供了一个聚合器,可用于在将消息发送到管道中的下一个阶段之前将消息组合在一起。

编辑

这是一个快速启动应用程序来展示……

 @SpringBootApplication public class So42969130Application implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(So42969130Application.class, args) .close(); } @Autowired private RabbitTemplate template; @Autowired private Handler handler; @Override public void run(String... args) throws Exception { this.template.convertAndSend("so9130", new PersonNameChanged(123)); this.template.convertAndSend("so9130", new PersonMoved(123)); this.handler.latch.await(10, TimeUnit.SECONDS); } @Bean public IntegrationFlow flow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130") .messageConverter(converter())) .aggregate(a -> a .correlationExpression("payload.id") .releaseExpression("false") // open-ended release, timeout only .sendPartialResultOnExpiry(true) .groupTimeout(2000)) .handle(handler()) .get(); } @Bean public Jackson2JsonMessageConverter converter() { return new Jackson2JsonMessageConverter(); } @Bean public Handler handler() { return new Handler(); } @Bean public Queue queue() { return new Queue("so9130", false, false, true); } public static class Handler { private final CountDownLatch latch = new CountDownLatch(1); @ServiceActivator public void handle(Collection aggregatedData) { System.out.println(aggregatedData); this.latch.countDown(); } } public static class PersonNameChanged { private int id; PersonNameChanged() { } PersonNameChanged(int id) { this.id = id; } public int getId() { return this.id; } public void setId(int id) { this.id = id; } @Override public String toString() { return "PersonNameChanged [id=" + this.id + "]"; } } public static class PersonMoved { private int id; PersonMoved() { } PersonMoved(int id) { this.id = id; } public int getId() { return this.id; } public void setId(int id) { this.id = id; } @Override public String toString() { return "PersonMoved [id=" + this.id + "]"; } } } 

双响炮:

   4.0.0 com.example so42969130 2.0.0-BUILD-SNAPSHOT jar so42969130 Demo project for Spring Boot  org.springframework.boot spring-boot-starter-parent 1.5.2.RELEASE     UTF-8 UTF-8 1.8    org.springframework.boot spring-boot-starter-integration   org.springframework.integration spring-integration-amqp   org.springframework.integration spring-integration-java-dsl   org.springframework.boot spring-boot-starter-test test      org.springframework.boot spring-boot-maven-plugin     

结果:

 2017-03-23 09:56:57.501 INFO 75217 --- [ask-scheduler-2] .siaAbstractCorrelatingMessageHandler : Expiring MessageGroup with correlationKey[123] [PersonNameChanged [id=123], PersonMoved [id=123]]