Tag: completable future

CompletableFuture | thenApplyAsync vs thenCompose及其用例

我试图理解CompletableFuture,并遇到了2个方法,然后是ApplyAsync然后是Make。 我试图了解这两者之间的区别。 CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + ” Printing hello”); return “Hello”; }).thenCompose((String s) -> { return CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + ” Adding abc”); return “abc “+s;}); }).thenApplyAsync((String s) -> { System.out.println(Thread.currentThread().getName() + ” Adding world”); return s + ” World”; }).thenApplyAsync((String s) -> { System.out.println(Thread.currentThread().getName() + ” Adding name”); if […]

如何在流中将1个完成的未来分成许多可完成的未来?

例如,我有这样的方法: public CompletableFuture getPage(int i) { … } public CompletableFuture getDocument(int i) { … } public CompletableFuture parseLinks(Document doc) { … } 我的流量: List list = IntStream .range(0, 10) .mapToObj(i -> getPage(i)) // I want method like this: .thenApplyAndSplit(CompletableFuture page -> { List<CompletableFuture> docs = page.getDocsId() .stream() .map(i -> getDocument(i)) .collect(Collectors.toList()); return docs; }) .map(CompletableFuture […]

执行CompletableFuture的多个thenAccept块的顺序是什么

所以我有一个返回CompletableFuture的方法。 在返回之前,此方法添加一个带有thenAccept的块,该块在CompletableFuture完成后执行。 此方法的调用者还使用thenAccept添加另一个块。 显然,这可以继续多个链式调用。 执行thenAccept调用返回的CompletionStage顺序是什么? 是否保证是添加它们的顺序? 如果没有,如何保证它们按照添加顺序执行? PS:我根据自己对CompletableFuture和本文的经验提出这个问题

CompletableFuture / ForkJoinPool设置类加载器

我解决了一个非常具体的问题,其解决方案似乎是基本的: 我的(Spring)应用程序的类加载器层次结构是这样的: SystemClassLoader -> PlatformClassLoader -> AppClassLoader 如果我使用Java CompleteableFuture来运行线程。 线程的ContextClassLoader是: SystemClassLoader -> PlatformClassLoader -> ThreadClassLoader 因此,虽然我不得不访问AppClassLoader任何类,因为所有外部库类都驻留在那里。 源代码库非常大,所以我不希望/不能将所有与线程相关的部分重写为其他内容(例如,将自定义执行程序传递给每个调用)。 所以我的问题是: 如何使用例如CompleteableFuture.supplyAsync()创建的线程使用AppClassLoader作为父级? (而不是PlatformClassloader ) 我发现ForkJoinPool用于创建线程。 但在我看来,一切都是静态的和最终的 。 所以我怀疑即使在系统属性中设置自定义ForkJoinWorkerThreadFactory也会有所帮助。 或者是吗? 编辑以回答评论中的问题: 你在哪里部署? 这是在jetty / tomcat /任何JEE容器内运行吗? 我正在使用默认的Spring Boot设置,因此使用内部tomcat容器。 你有什么确切的问题? 确切的问题是: java.lang.IllegalArgumentException:从类加载器中看不到从方法引用的org.keycloak.admin.client.resource.RealmsResource 您提交给supplyAsync()的作业是从AppClassLoader创建的,不是吗? 从MainThread调用supplyAsync ,它使用AppClassLoader 。 但是,调试应用程序显示所有此类线程都将PlatformClassLoader作为其父级。 至于我的理解,这是因为ForkJoinPool.commonPool()是在应用程序启动期间构建的(因为它是静态的),因此使用默认的类加载器作为父类PlatformClassLoader 。 因此,来自此池的所有线程都将PlatformClassLoader作为ContextClassLoader (而不是AppClassLoader )的父级。 当我在MainThread创建自己的执行程序并将此执行程序传递给supplyAsync一切正常 – 我可以在调试期间看到,确实现在AppClassLoader是我的ThreadClassLoader的父ThreadClassLoader 。 这似乎证实了我在第一种情况下的假设,即公共池不是由MainThread创建的,至少不是在使用AppClassLoader本身时。 完整堆栈跟踪: java.lang.IllegalArgumentException: org.keycloak.admin.client.resource.RealmsResource […]

如何使用ExecutorService进行轮询,直到结果到达

我有一个场景,我必须轮询远程服务器检查任务是否已完成。 一旦有,我会进行不同的调用以检索结果。 我原本认为我应该使用带有scheduleWithFixedDelay的SingleThreadScheduledExecutor进行轮询: ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS); public void poll(String jobId) { boolean jobDone = remoteServer.isJobDone(jobId); if (jobDone) { retrieveJobResult(jobId); } } 但是因为我只能提供一个Runnable来scheduleWithFixedDelay不能返回任何东西的scheduleWithFixedDelay ,所以我不明白future什么时候会完成。 调用future.get()甚至意味着什么? 我在等什么结果? 我第一次检测到远程任务已经完成,我想执行一个不同的远程调用并将其结果设置为future的值。 我想我可以使用CompletableFuture,我会转发到我的poll方法,然后将它转发到我最终完成它的retrieveTask方法: CompletableFuture result = new CompletableFuture(); ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS); public void poll(String […]

在编写Java CompletableFutures时使用哪个执行程序?

我在某个存储库类上有一个返回CompletableFuture 。 完成这些期货的代码使用阻止的第三方库。 我打算有一个单独的有界Executor ,这个存储库类将用它来进行这些阻塞调用。 这是一个例子: public class PersonRepository { private Executor executor = … public CompletableFuture create(Person person) {…} public CompletableFuture delete(Person person) {…} } 我的应用程序的其余部分将组成这些未来,并对结果做一些其他事情。 当提供给thenAccept , thenCompose , whenComplete等的其他函数时,我不希望它们在存储库的Executor上运行。 另一个例子: public CompletableFuture replacePerson(Person person) { final PersonRepository repo = getPersonRepository(); return repo.delete(person) .thenAccept(existed -> { if (existed) System.out.println(“Person deleted”)); else System.out.println(“Person did […]

您如何访问已传递给CompletableFuture allOf的已完成期货?

我试图掌握Java 8 CompletableFuture。 我怎样才能将这些人加入到“allOf”之后再归还给他们。 下面的代码不起作用,但让您了解我尝试过的内容。 在javascript ES6中我会这样做 Promise.all([p1, p2]).then(function(persons) { console.log(persons[0]); // p1 return value console.log(persons[1]); // p2 return value }); 到目前为止我在Java方面的努力 public class Person { private final String name; public Person(String name) { this.name = name; } public String getName() { return name; } } @Test public void combinePersons() throws ExecutionException, InterruptedException { CompletableFuture […]

递归取消allOff CompletableFuture

如果我有 CompletableFuture future1 = service.request(param1); CompletableFuture future2 = service.request(param2); CompletableFuture many = CompletableFuture.allOf(future1, future2); 当我做many.cancel()时会发生什么? future1和future2被取消吗? 如果没有,那么实现这一目标的最简洁方法是什么? 我不愿意坚持future1和future2 ,只是为了能够在取消many时取消它们。 关于为什么我想要这个的一些背景:当接收一条数据时,我需要请求匹配的,可能未来的数据来执行计算。 如果有更新的数据到达,我想取消先前计算的完成,因为结果将立即被新计算取代。

在期货清单上流式传输的最有效方式

我通过流式传输对象列表来调用异步客户端方法。 该方法返回Future。 迭代调用后返回的Futures列表的最佳方法是什么(以便处理那些首先出现的Future)? 注意:异步客户端仅返回Future not CompletableFuture。 以下是代码: List<Future> listOfFuture = objectsToProcess.parallelStream() .map((object) -> { /* calling an async client returning a Future */ }) .collect(Collectors.toList());

CompletableFuture withFallback /只处理一些错误

我通过CompletableFuture收到服务电话的回复。 我想处理服务返回的一些已知exception – 例如乐观并发控制冲突。 这就是我所拥有的。 有没有更好的方法来做这个不包装exception或使用SneakyThrows? 包装exception意味着其他exception处理程序必须检查因果链而不仅仅是使用instanceof 。 someService.call(request) .handle((response, error) -> { if (error == null) return CompletableFuture.completedFuture(response); else if (error instanceof OCCException) return CompletableFuture.completedFuture(makeDefaultResponse()); CompletableFuture errorFuture = new CompletableFuture(); errorFuture.completeExceptionally(error); return errorFuture; }).thenCompose(Function.identity()); 同样的,有没有办法复制番石榴withFallback没有包装解开? CompletableFuture withFallback(CompletableFuture future, Function<Throwable, ? extends CompletableFuture> fallback) { return future.handle((response, error) -> { if (error == null) return […]