Tag: reactive programming

RxJava是否适合分支工作流程?

我正在使用RxJava来处理我们从队列中提取的一些通知。 RxJava似乎在一个简单的工作流程中运行良好,现在有了新的要求,流程越来越复杂,分支越多(请参见下图作为参考) 我尝试通过一个小unit testing来举例说明流程: @Test public void test() { Observable.range(1, 100) .groupBy(n -> n % 3) .toMap(GroupedObservable::getKey) .flatMap(m1 -> { Observable ones1 = m1.get(0); Observable twos1 = m1.get(1).map(n -> n – 10); Observable threes = m1.get(2).map(n -> n + 100); Observable onesAndTwos = Observable.merge(ones1, twos1) .map(n -> n * 3) .groupBy(n -> n % 2) […]

如何从Http集成流程创建Spring Reactor Flux?

我有一个与此非常类似的问题如何从ActiveMQ队列创建Spring Reactor Flux? 一个区别是消息来自Http端点而不是JMS队列。 问题是消息通道由于某种原因没有填充,或者Flux.from()没有获取它。 日志条目显示GenericMessage是从Http Integration流创建的,有效负载作为路径变量,但不会入队/发布到通道? 我试过.channel(MessageChannels.queue())和.channel(MessageChannels.publishSubscribe())没有任何区别,事件流是空的。 这是代码: @Bean public Publisher<Message> httpReactiveSource() { return IntegrationFlows. from(Http.inboundChannelAdapter(“/eventmessage/{id}”) .requestMapping(r -> r .methods(HttpMethod.POST) ) .payloadExpression(“#pathVariables.id”) ) .channel(MessageChannels.queue()) .log(LoggingHandler.Level.DEBUG) .log() .toReactivePublisher(); } @GetMapping(value=”eventmessagechannel/{id}”, produces=MediaType.TEXT_EVENT_STREAM_VALUE) public Flux eventMessages(@PathVariable String id){ return Flux.from(httpReactiveSource()) .map(Message::getPayload); } UPDATE1: 的build.gradle buildscript { ext { springBootVersion = ‘2.0.0.M2’ } repositories { mavenCentral() maven { […]

如何从ActiveMQ队列创建Spring Reactor Flux?

我正在试验Spring Reactor 3组件和Spring Integration来从JMS队列创建一个反应流(Flux)。 我试图从JMS队列(使用Spring Integration的ActiveMQ)创建一个反应流(Spring Reactor 3 Flux),以便客户端异步获取JMS消息。 我相信我已正确连接所有内容但客户端在服务器停止之前不会收到任何JMS消息。 然后,所有消息被“推送”到客户端一次。 任何帮助,将不胜感激。 这是我用来配置JMS,Integration组件和被动发布者的配置文件: @Configuration @EnableJms @EnableIntegration public class JmsConfiguration { @Value(“${spring.activemq.broker-url:tcp://localhost:61616}”) private String defaultBrokerUrl; @Value(“${queues.patient:patient}”) private String patientQueue; @Autowired MessageListenerAdapter messageListenerAdapter; @Bean public DefaultJmsListenerContainerFactory myFactory( DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, jmsConnectionFactory()); return factory; } @Bean public Queue patientQueue() { return new […]

如何在RxJava中执行递归可观察调用?

我对RxJava (以及一般的反应范式) 都很陌生 ,所以请耐心等待。 假设我有这个News和这个嵌套的Comment数据结构: public class News { public int id; public int[] commentIds; //only top level comments public News(int id, int[] commentIds) { this.id = id; this.commentIds = commentIds; } } public class Comment { public int id; public int parentId; //ID of parent News or parent comment public int[] childIds; public Comment(int […]

RxJava – 终止无限流

我正在探索反应式编程和RxJava。 这很有趣,但我遇到了一个无法找到答案的问题。 我的基本问题:什么是一种react native的方法来终止一个无限运行的Observable? 我也欢迎关于我的代码的批评和react native最佳实践。 作为练习,我正在编写一个日志文件尾实用程序。 日志文件中的行流由Observable 。 为了让BufferedReader继续读取添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为意味着我的线程应该hibernate并等待更多的记录器文本。 但是虽然我可以使用takeUntil终止Observer,但我需要找到一种干净的方法来终止无限运行的文件观察器。 我可以编写自己的terminateWatcher方法/字段,但这会破坏Observable / Observer封装 – 我希望尽可能严格遵守反应范式。 这是Observable代码: public class FileWatcher implements OnSubscribeFunc { private Path path = . . .; @Override // The generic is pointless but required by the compiler public Subscription onSubscribe(Observer observer) { try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) […]

如何让这个rxjava zip并行运行?

我有一个睡眠方法来模拟一个长时间运行的过程。 private void sleep() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } 然后我有一个方法返回一个Observable,其中包含参数中给出的2个字符串的列表。 它在返回字符串之前调用sleep。 private Observable<List> getStrings(final String str1, final String str2) { return Observable.fromCallable(new Callable<List>() { @Override public List call() { sleep(); List strings = new ArrayList(); strings.add(str1); strings.add(str2); return strings; } }); } 然后我在Observalb.zip中调用了三次getStrings,我希望这三个调用并行运行,所以执行的总时间应该在2秒或3秒之内,因为睡眠只有2秒。 但是,它总共花了六秒钟。 如何让它并行运行,以便在2秒内完成? Observable .zip(getStrings(“One”, “Two”), […]

如何处理RxJava中观察者的onNext引发的exception?

请考虑以下示例: Observable.range(1, 10).subscribe(i -> { System.out.println(i); if (i == 5) { throw new RuntimeException(“oops!”); } }, Throwable::printStackTrace); 这将输出1到5之间的数字,然后打印exception。 我想要实现的是让观察者保持订阅并在抛出exception后继续运行,即打印从1到10的所有数字。 我尝试过使用retry()和其他各种error handling操作符 ,但是,如文档中所述,它们的目的是处理observable本身发出的错误。 最简单的解决方案就是将整个onNext成try-catch块,但这对我来说听起来不是一个好方法。 在类似的Rx.NET问题中 ,建议的解决方案是创建一个扩展方法,通过创建代理可观察来进行包装。 我试图重拍它: Observable origin = Observable.range(1, 10); Observable proxy = Observable.create((Observable.OnSubscribe) s -> origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted)); proxy.subscribe(i -> { System.out.println(i); if (i […]

Paginate Observable结果没有递归 – RxJava

我有一个非常标准的API分页问题,​​您可以使用一些简单的递归来处理。 这是一个捏造的例子: public Observable<List> scan() { return scanPage(Optional.empty(), ImmutableList.of()); } private Observable scanPage(Optional startKey, List results) { return this.scanner.scan(startKey, LIMIT) .flatMap(page -> { if (!page.getLastKey().isPresent()) { return Observable.just(results); } return scanPage(page.getLastKey(), ImmutableList.builder() .addAll(results) .addAll(page.getResults()) .build() ); }); } 但这显然可以创建一个巨大的callstack。 如何强制执行此操作但保持Observable流? 这是一个命令式阻塞示例: public List scan() { Optional startKey = Optional.empty(); final ImmutableList.Builder results = ImmutableList.builder(); do […]