如何从ActiveMQ队列创建Spring Reactor Flux?

我正在试验Spring Reactor 3组件和Spring Integration来从JMS队列创建一个反应流(Flux)。

我试图从JMS队列(使用Spring Integration的ActiveMQ)创建一个反应流(Spring Reactor 3 Flux),以便客户端异步获取JMS消息。 我相信我已正确连接所有内容但客户端在服务器停止之前不会收到任何JMS消息。 然后,所有消息被“推送”到客户端一次。

任何帮助,将不胜感激。

这是我用来配置JMS,Integration组件和被动发布者的配置文件:

@Configuration @EnableJms @EnableIntegration public class JmsConfiguration { @Value("${spring.activemq.broker-url:tcp://localhost:61616}") private String defaultBrokerUrl; @Value("${queues.patient:patient}") private String patientQueue; @Autowired MessageListenerAdapter messageListenerAdapter; @Bean public DefaultJmsListenerContainerFactory myFactory( DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, jmsConnectionFactory()); return factory; } @Bean public Queue patientQueue() { return new ActiveMQQueue(patientQueue); } @Bean public ActiveMQConnectionFactory jmsConnectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(defaultBrokerUrl); connectionFactory.setTrustedPackages(Arrays.asList("com.sapinero")); return connectionFactory; } // Set the jackson message converter @Bean public JmsTemplate jmsTemplate() { JmsTemplate template = new JmsTemplate(); template.setConnectionFactory(jmsConnectionFactory()); template.setDefaultDestinationName(patientQueue); template.setMessageConverter(jacksonJmsMessageConverter()); return template; } @Bean public MessageListenerAdapter messageListenerAdapter() { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(); messageListenerAdapter.setMessageConverter(jacksonJmsMessageConverter()); return messageListenerAdapter; } @Bean public AbstractMessageListenerContainer messageListenerContainer() { DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer(); defaultMessageListenerContainer.setMessageConverter(jacksonJmsMessageConverter()); defaultMessageListenerContainer.setConnectionFactory(jmsConnectionFactory()); defaultMessageListenerContainer.setDestinationName(patientQueue); defaultMessageListenerContainer.setMessageListener(messageListenerAdapter()); defaultMessageListenerContainer.setCacheLevel(100); defaultMessageListenerContainer.setErrorHandler(new ErrorHandler() { @Override public void handleError(Throwable t) { t.printStackTrace(); } }); return defaultMessageListenerContainer; } @Bean // Serialize message content to json using TextMessage public MessageConverter jacksonJmsMessageConverter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setTargetType(MessageType.TEXT); converter.setTypeIdPropertyName("_type"); return converter; } @Bean public MessageChannel jmsOutboundInboundReplyChannel() { return MessageChannels.queue().get(); } @Bean public Publisher<Message> pollableReactiveFlow() { return IntegrationFlows .from(Jms.messageDrivenChannelAdapter(messageListenerContainer()).get()) .channel(MessageChannels.queue()) .log(LoggingHandler.Level.DEBUG) .log() .toReactivePublisher(); } @Bean public MessageChannel jmsChannel() { return new DirectChannel(); } 

创建Flux的控制器是:

 @RestController @RequestMapping("patients") public class PatientChangePushController { private LocalDateTime lastTimePatientDataRetrieved = LocalDateTime.now(); private int durationInSeconds = 30; private Patient patient; AtomicReference checkFinally = new AtomicReference(); @Autowired PatientService patientService; @Autowired @Qualifier("pollableReactiveFlow") private Publisher<Message> pollableReactiveFlow; @Autowired private JmsTemplate jmsTemplate; @Autowired private Queue patientQueue; /** * Subscribe to a Flux of a patient that has been updated. * * @param id * @return */ @GetMapping(value = "/{id}/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<Message> getPatientAlerts(@PathVariable Long id) { Flux<Message> messageFlux = Flux.from(pollableReactiveFlow); return messageFlux; } @GetMapping(value = "/generate") public void generateJmsMessage() { for (long i = 0L; i < 100; i++) { Patient patient = new Patient(); patient.setId(i); send(patient); System.out.println("Message was sent to the Queue"); } } void send(Patient patient) { this.jmsTemplate.convertAndSend(this.patientQueue, patient); } } 

如果有人能告诉我为什么在服务器被杀之前消息不会被发送到客户端,我将不胜感激。

适合我:

 @SpringBootApplication @RestController public class SpringIntegrationSseDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringIntegrationSseDemoApplication.class, args); } @Autowired private ConnectionFactory connectionFactory; @Autowired private JmsTemplate jmsTemplate; @Bean public Publisher> jmsReactiveSource() { return IntegrationFlows .from(Jms.messageDrivenChannelAdapter(this.connectionFactory) .destination("testQueue")) .channel(MessageChannels.queue()) .log(LoggingHandler.Level.DEBUG) .log() .toReactivePublisher(); } @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux getPatientAlerts() { return Flux.from(jmsReactiveSource()) .map(Message::getPayload); } @GetMapping(value = "/generate") public void generateJmsMessage() { for (int i = 0; i < 100; i++) { this.jmsTemplate.convertAndSend("testQueue", "testMessage #" + (i + 1)); } } } 

在一个终端,我有curl http://localhost:8080/events等待来自该Flux SSE。

在其他终端我执行curl http://localhost:8080/generate并在第一个中看到:

 data:testMessage #1 data:testMessage #2 data:testMessage #3 data:testMessage #4 

我使用Spring Boot 2.0.0.BUILD-SNAPSHOT。

另见: https : //spring.io/blog/2017/03/08/spring-tips-server-sent-events-sse