优化许多文件的并行处理

我有一个程序处理大量文件,每个文件需要完成两件事:首先,读取并处理文件的某些部分,然后存储生成的MyFileData 。 第一部分可以并行化,第二部分不能。

按顺序执行所有操作非常慢,因为CPU必须等待磁盘,然后它会工作一点,然后它会发出另一个请求,然后再次等待…

我做了以下

 class MyCallable implements Callable { MyCallable(File file) { this.file = file; } public MyFileData call() { return someSlowOperation(file); } private final File file; } for (File f : files) futures.add(executorService.submit(new MyCallable(f))); for (Future f : futures) sequentialOperation(f.get()); 

它帮助了很多。 但是,我想改进两件事:

  • sequentialOperation以固定顺序执行,而不是先处理可用的结果。 我该怎么改变它?

  • 有数千个文件需要处理,启动数千个磁盘请求可能导致磁盘丢失。 通过使用Executors.newFixedThreadPool(10)我限制了这个数字,但是我正在寻找更好的东西。 理想情况下,它应该是自我调整的,以便它在不同的计算机上工作最佳(例如,当RAID和/或NCQ可用时发出更多请求等)。 我不认为它可能基于找到硬件配置,但测量处理速度和基于它的优化应该是可能的。 任何想法?

sequentialOperation以固定顺序执行,而不是先处理可用的结果。 我该怎么改变它?

这正是CompletionService的作用:它并行处理任务并在完成任务时返回它们,而不管提交顺序如何。

简化(未测试)示例:

 int NUM_THREADS = Runtime.getRuntime().availableProcessors(); ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); CompletionService completionService = new ExecutorCompletionService(executor); for (File f : files) futures.add(completionService.submit(new MyCallable(f))); for(int i = 0; i < futures.size(); i++) { Future next = completionService.take(); sequentialOperation(next.get()); } 

有数千个文件需要处理,启动数千个磁盘请求可能导致磁盘丢失。 通过使用Executors.newFixedThreadPool(10)我限制了这个数字,但是我正在寻找更好的东西。

我不是百分百肯定那个。 我想这取决于你有多少磁盘,但我认为磁盘访问部分不应该分成太多的线程(每个磁盘一个线程可能是明智的):如果许multithreading同时访问一个磁盘,它将花费更多的时间寻求而不是阅读。

sequentialOperation以固定顺序执行,而不是先处理可用的结果。 我该怎么改变它?

假设:每个someSlowOperation(file); 调用将花费不同的时间,因此,您希望在收到MyFileData立即处理,而不是与另一个sequentialOperation

您可以通过设置生产者/消费者队列来实现此目的。

生成器是您在示例中执行的callables ,添加的位用于将结果添加到等待处理的工作队列中。

Consumer是sequentialOperation()调用 – 它在自己的线程中运行,只有一个。 所有这个线程都是占用队列的头部,并处理它,重复直到程序结束。

这样,您可以最大限度地利用计算机上的所有资源。

带有一些示例代码的相关post: 使用队列的生产者/消费者线程

编辑:我想你可能想要一个快速的样本,因为它对以前从未做过的人来说非常不透明

 public class Main { private final ExecutorService producerExecutor = Executors.newFixedThreadPool(10); private final ExecutorService consumerExecutor = Executors.newFixedThreadPool(1); private final LinkedBlockingQueue queue = new LinkedBlockingQueue();//or some other impl abstract class Producer implements Runnable{ private final File file; Producer(File file) { this.file = file; } public void run() { MyData result = someLongAssOperation(file); queue.offer(result); } public abstract void someLongAssOperation(File file); } abstract class Consumer implements Runnable { public void run() { while (true) { sequentialOperation(queue.take()); } } public abstract void sequentialOperation(MyData data); } private void start() { consumerExecutor.submit(new Consumer(){ //implement sequentialOperation here }); for (File f : files) { producerExecutor.submit(new Producer(file) { //implement the someLongAssOperation() }); } } public static void main(String[] args) { new Main().start(); } }