如何从Http集成流程创建Spring Reactor Flux?

我有一个与此非常类似的问题如何从ActiveMQ队列创建Spring Reactor Flux?

一个区别是消息来自Http端点而不是JMS队列。 问题是消息通道由于某种原因没有填充,或者Flux.from()没有获取它。 日志条目显示GenericMessage是从Http Integration流创建的,有效负载作为路径变量,但不会入队/发布到通道? 我试过.channel(MessageChannels.queue()).channel(MessageChannels.publishSubscribe())没有任何区别,事件流是空的。 这是代码:

 @Bean public Publisher<Message> httpReactiveSource() { return IntegrationFlows. from(Http.inboundChannelAdapter("/eventmessage/{id}") .requestMapping(r -> r .methods(HttpMethod.POST) ) .payloadExpression("#pathVariables.id") ) .channel(MessageChannels.queue()) .log(LoggingHandler.Level.DEBUG) .log() .toReactivePublisher(); } @GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE) public Flux eventMessages(@PathVariable String id){ return Flux.from(httpReactiveSource()) .map(Message::getPayload); } 

UPDATE1:

的build.gradle

 buildscript { ext { springBootVersion = '2.0.0.M2' } repositories { mavenCentral() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } } dependencies { compile('org.springframework.boot:spring-boot-starter-freemarker') compile('org.springframework.boot:spring-boot-starter-integration') compile('org.springframework.boot:spring-boot-starter-web') compile('org.springframework.boot:spring-boot-starter-webflux') compile('org.springframework.integration:spring-integration-http') testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('io.projectreactor:reactor-test') } 

UPDATE2

它在@SpringBootApplication@RestController在一个文件中定义时@SpringBootApplication ,但在@SpringBootApplication@RestController在单独的文件中时停止工作。

TestApp.java

 package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class TestApp { public static void main(String[] args) { SpringApplication.run(TestApp.class, args); } } 

TestController.java

 package com.example.controller; import org.springframework.context.annotation.Bean; import org.reactivestreams.Publisher; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.channel.MessageChannels; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.http.dsl.Http; import org.springframework.messaging.Message; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.GetMapping; import reactor.core.publisher.Flux; @RestController public class TestController { @Bean public Publisher<Message> httpReactiveSource() { return IntegrationFlows. from(Http.inboundChannelAdapter("/message/{id}") .requestMapping(r -> r .methods(HttpMethod.POST) ) .payloadExpression("#pathVariables.id") ) .channel(MessageChannels.queue()) .toReactivePublisher(); } @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux eventMessages() { return Flux.from(httpReactiveSource()) .map(Message::getPayload); } } 

这对我很有用:

 @SpringBootApplication @RestController public class SpringIntegrationSseDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringIntegrationSseDemoApplication.class, args); } @Bean public Publisher> httpReactiveSource() { return IntegrationFlows. from(Http.inboundChannelAdapter("/message/{id}") .requestMapping(r -> r .methods(HttpMethod.POST) ) .payloadExpression("#pathVariables.id") ) .channel(MessageChannels.queue()) .toReactivePublisher(); } @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux eventMessages() { return Flux.from(httpReactiveSource()) .map(Message::getPayload); } } 

我在POM中有这种依赖:

  org.springframework.boot spring-boot-starter-parent 2.0.0.BUILD-SNAPSHOT      org.springframework.boot spring-boot-starter-integration   org.springframework.boot spring-boot-starter-web   org.springframework.boot spring-boot-starter-webflux   org.springframework.integration spring-integration-http   org.springframework.boot spring-boot-starter-test test      org.springframework.boot spring-boot-maven-plugin    

我运行应用程序并有两个终端:

 curl http://localhost:8080/events 

听SSE。

在第二个我执行此操作:

 curl -X POST http://localhost:8080/message/foo curl -X POST http://localhost:8080/message/bar curl -X POST http://localhost:8080/message/666 

所以,第一个终端响应如下:

 data:foo data:bar data:666 

注意,我们不需要spring-boot-starter-webflux依赖项。 Flux to SSE适用于Servlet容器上的常规MVC。

Spring Integration也将很快支持WebFlux: https ://jira.spring.io/browse/INT-4300。 所以,你将能够在那里配置:

  IntegrationFlows .from(Http.inboundReactiveGateway("/sse") .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))) 

完全只依赖WebFlux而没有任何Servlet容器依赖。