Group在RabbitMQ中收到消息,最好使用Spring AMQP?
我正在接收来自服务(S)的消息,该服务将每个单独的属性更改作为单独的消息发布到实体。 一个人为的例子是这样的实体:
Person { id: 123 name: "Something", address: {...} }
如果在同一事务中更新了名称和地址,则(S)将发布两条消息, PersonNameCorrected
和PersonMoved
。 问题出在接收方,我正在存储此Person
实体的投影,每个属性更改都会导致写入数据库。 因此,在这个例子中,将有两次写入数据库,但如果我可以在短时间内批量处理消息并按id分组,那么我只需要对数据库进行一次写入。
如何在RabbitMQ中处理这个问题? Spring AMQP是否提供了更简单的抽象?
请注意,我已经简要介绍了预取,但我不确定这是否可行。 如果我理解正确的话,预取也是基于连接的。 我试图在每个队列的基础上实现这一点,因为如果批处理(因此增加了延迟)是要走的路,我不想将这种延迟添加到我的服务所消耗的所有队列中(但仅限于那些需要“group-by-id”function)。
预取对于这样的情况无济于事。
考虑使用Spring Integration ,它具有位于Spring AMQP之上的适配器; 它还提供了一个聚合器,可用于在将消息发送到管道中的下一个阶段之前将消息组合在一起。
编辑
这是一个快速启动应用程序来展示……
@SpringBootApplication public class So42969130Application implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(So42969130Application.class, args) .close(); } @Autowired private RabbitTemplate template; @Autowired private Handler handler; @Override public void run(String... args) throws Exception { this.template.convertAndSend("so9130", new PersonNameChanged(123)); this.template.convertAndSend("so9130", new PersonMoved(123)); this.handler.latch.await(10, TimeUnit.SECONDS); } @Bean public IntegrationFlow flow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130") .messageConverter(converter())) .aggregate(a -> a .correlationExpression("payload.id") .releaseExpression("false") // open-ended release, timeout only .sendPartialResultOnExpiry(true) .groupTimeout(2000)) .handle(handler()) .get(); } @Bean public Jackson2JsonMessageConverter converter() { return new Jackson2JsonMessageConverter(); } @Bean public Handler handler() { return new Handler(); } @Bean public Queue queue() { return new Queue("so9130", false, false, true); } public static class Handler { private final CountDownLatch latch = new CountDownLatch(1); @ServiceActivator public void handle(Collection> aggregatedData) { System.out.println(aggregatedData); this.latch.countDown(); } } public static class PersonNameChanged { private int id; PersonNameChanged() { } PersonNameChanged(int id) { this.id = id; } public int getId() { return this.id; } public void setId(int id) { this.id = id; } @Override public String toString() { return "PersonNameChanged [id=" + this.id + "]"; } } public static class PersonMoved { private int id; PersonMoved() { } PersonMoved(int id) { this.id = id; } public int getId() { return this.id; } public void setId(int id) { this.id = id; } @Override public String toString() { return "PersonMoved [id=" + this.id + "]"; } } }
双响炮:
4.0.0 com.example so42969130 2.0.0-BUILD-SNAPSHOT jar so42969130 Demo project for Spring Boot org.springframework.boot spring-boot-starter-parent 1.5.2.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-amqp org.springframework.integration spring-integration-java-dsl org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin
结果:
2017-03-23 09:56:57.501 INFO 75217 --- [ask-scheduler-2] .siaAbstractCorrelatingMessageHandler : Expiring MessageGroup with correlationKey[123] [PersonNameChanged [id=123], PersonMoved [id=123]]