Tag: forkjoinpool

如何在ForkJoinPool中阻止队列?

我需要在其队列已满时阻止ForkJoinPool上的线程。 这可以在标准的ThreadPoolExecutor中完成,例如: private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); } 我知道,ForkJoinPool中有一些Dequeue,但是我无法通过它访问它。 更新:请参阅下面的答案。

Java:如何从运行的线程中获取完成的线程来拾取任务

我正在使用具有不同运行时间的任务的multithreading应用程序。 当一个线程完成时,是否有办法从仍在运行的线程中接管某些任务? 这是一个例子。 我用5个线程启动程序,每个程序有50个任务。 当最快的运行线程完成时,另一个线程仍然有40个任务要完成。 如何让完成的线程从另一个线程中获取20个任务,因此每个线程继续工作20个,而不是等待正在运行的线程完成剩下的40个?

在并行流上调用顺序会使所有先前的操作顺序进行

我有一个重要的数据集,并希望调用缓慢但干净的方法,而不是调用带有副作用的快速方法对第一个结果。 我对中间结果不感兴趣,所以我不想收集它们。 显而易见的解决方案是创建并行流,进行慢速呼叫,再次使流顺序,并进行快速呼叫。 问题是,所有代码都在单线程中执行,没有实际的并行性。 示例代码: @Test public void testParallelStream() throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); Set threads = forkJoinPool.submit(()-> new Random().ints(100).boxed() .parallel() .map(this::slowOperation) .sequential() .map(Function.identity())//some fast operation, but must be in single thread .collect(Collectors.toSet()) ).get(); System.out.println(threads); Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size()); } private String slowOperation(int value) { try { Thread.sleep(100); } […]

Java ForkJoinPool具有非递归任务,是否可以正常工作?

我想通过一种方法将Runnable任务提交到ForkJoinPool: forkJoinPool.submit(Runnable task) 注意,我使用的是JDK 7。 在引擎盖下,它们被转换为ForkJoinTask对象。 我知道当一个任务以递归方式分成较小的任务时,ForkJoinPool是有效的。 题: 如果没有递归,窃取工作仍然可以在ForkJoinPool中工作吗? 在这种情况下值得吗? 更新1:任务很小,可能不平衡。 即使对于严格相同的任务,诸如上下文切换,线程调度,停车,页面未命中等事情也会导致导致不平衡 。 更新2: Doug Lea在并发JSR-166兴趣小组中写道,给出了一个暗示: 当所有任务都是异步并提交到池而不是分叉时,这也极大地提高了吞吐量,这成为构造actor框架的合理方法,以及许多您可能使用ThreadPoolExecutor的普通服务。 我认为,当涉及相当小的CPU绑定任务时,由于这种优化,ForkJoinPool是可行的方法。 重点是这些任务已经很小,不需要递归分解。 无论是大工作还是小任务, 工作窃取工作都可以被来自忙碌工人的Deque尾巴的另一个自由工作者抓住。 更新3: ForkJoinPool的可扩展性 – Akka乒乓球队的基准测试显示了很好的结果。 尽管如此,要更有效地应用ForkJoinPool还需要进行性能调整。

Java 8并行流和ThreadLocal

我试图弄清楚如何在Java 8并行流中复制ThreadLocal值。 所以如果我们考虑这个: public class ThreadLocalTest { public static void main(String[] args) { ThreadContext.set(“MAIN”); System.out.printf(“Main Thread: %s\n”, ThreadContext.get()); IntStream.range(0,8).boxed().parallel().forEach(n -> { System.out.printf(“Parallel Consumer – %d: %s\n”, n, ThreadContext.get()); }); } private static class ThreadContext { private static ThreadLocal val = ThreadLocal.withInitial(() -> “empty”); public ThreadContext() { } public static String get() { return val.get(); } […]

Java支持三种不同的并发模型

我在multithreading环境中经历了不同的并发模型( http://tutorials.jenkov.com/java-concurrency/concurrency-models.html ) 本文重点介绍了三种并发模型 。 并行工人 第一个并发模型就是我所说的并行工作模型。 传入的工作分配给不同的工作人员 。 流水线 工人的组织就像工厂assembly线上的工人一样。 每个工人只执行完整工作的一部分。 当该部分完成时,工人将工作转发给下一个工人。 每个工作者都在自己的线程中运行,并且不与其他工作者共享任何状态。 这有时也称为无共享并发模型。 function并行 函数并行的基本思想是使用函数调用实现程序。 函数可以被视为彼此发送消息的“ 代理 ”或“ 参与者 ”,就像在流水线并发模型(AKA反应或事件驱动系统)中一样。 当一个函数调用另一个函数时,这类似于发送消息。 现在我想为这三个概念映射java API支持 并行工作者 :它是ExecutorService , ThreadPoolExecutor , CountDownLatch API吗? assembly线 :将事件发送到JMS等消息传递系统并使用队列和主题的消息传递概念。 function并行 : ForkJoinPool在某种程度上和java 8流。 与溪流相比,ForkJoin池易于理解。 我是否正确映射这些并发模型? 如果没有,请纠正我。

官方文档在哪里说Java的并行流操作使用fork / join?

以下是我对Java 8的Stream框架的理解: 东西创造了一个源流 该实现负责提供BaseStream#parallel()方法,该方法依次返回可以并行运行其操作的Stream。 虽然有人已经找到了一种方法来使用带有Stream框架并行执行的自定义线程池,但我无法在Java 8 API中找到任何提及默认Java 8并行Stream实现将使用ForkJoinPool#commonPool()的内容 。 ( Collection#parallelStream() , StreamSupport类中的方法,以及API中我不知道的其他可能的并行启用流源)。 我只能搜索搜索结果的花絮是: Lambda的状态:图书馆版(“引擎盖下的并行”) 模糊地提到Stream框架和Fork / Join机制。 Fork / Join机器旨在实现此过程的自动化。 JEP 107:集合的批量数据操作 几乎直接声明Collection接口的默认方法#parallelStream()使用Fork / Join实现自身。 但仍然没有关于公共池。 并行实现基于Java 7中引入的java.util.concurrency Fork / Join实现。 因此: Collection#parallelStream() 。 类数组(Javadoc) 直接指出使用公共池的多次。 ForkJoin公共池用于执行任何并行任务。 所以我的问题是: 在哪里说ForkJoinPool#commonPool()用于从Java 8 API获得的流上的并行操作?

java Fork / Join池,ExecutorService和CountDownLatch

我们在java中有三种不同的multithreading技术 – Fork / Join池,Executor Service和CountDownLatch Fork / Join pool ( http://www.javacodegeeks.com/2011/02/java-forkjoin-parallel-programming.html ) Fork / Join框架旨在使分而治之的算法易于并行化。 这种类型的算法非常适合于可以分为两个或更多相同类型的子问题的问题。 他们使用递归将问题分解为简单的任务,直到这些变得足够简单直接解决。 然后组合子问题的解决方案以给出原始问题的解决方案 ExecutorService是一个扩展Executor类并表示异步执行的接口。 它为我们提供了管理结束和检测异步任务进度的机制。 invokeAll() :执行给定的任务,返回一个Futures列表,保存状态和结果全部完成。 Future.isDone()对于返回列表的每个元素都为true。 CountDownLatch 🙁 http://examples.javacodegeeks.com/core-java/util/concurrent/countdownlatch-concurrent/java-util-concurrent-countdownlatch-example/ ) CountDownLatch用于同步,以允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。 我的假设: 在这两种替代方案中,只有在完成所有任务/线程后才能知道最终结果。 这三种选择是互补的还是互补的 ?

CompletableFuture的完成处理程序在哪个线程中执行?

我有一个关于CompletableFuture方法的问题: public CompletableFuture thenApply(Function fn) 事情是JavaDoc说的就是这样: 返回一个新的CompletionStage,当该阶段正常完成时,将使用此阶段的结果作为所提供函数的参数执行。 有关特殊完成的规则​​,请参阅CompletionStage文档。 线程怎么样? 这将在哪个线程中执行? 如果未来由线程池完成怎么办?