优化许多文件的并行处理
我有一个程序处理大量文件,每个文件需要完成两件事:首先,读取并处理文件的某些部分,然后存储生成的MyFileData
。 第一部分可以并行化,第二部分不能。
按顺序执行所有操作非常慢,因为CPU必须等待磁盘,然后它会工作一点,然后它会发出另一个请求,然后再次等待…
我做了以下
class MyCallable implements Callable { MyCallable(File file) { this.file = file; } public MyFileData call() { return someSlowOperation(file); } private final File file; } for (File f : files) futures.add(executorService.submit(new MyCallable(f))); for (Future f : futures) sequentialOperation(f.get());
它帮助了很多。 但是,我想改进两件事:
-
sequentialOperation
以固定顺序执行,而不是先处理可用的结果。 我该怎么改变它? -
有数千个文件需要处理,启动数千个磁盘请求可能导致磁盘丢失。 通过使用
Executors.newFixedThreadPool(10)
我限制了这个数字,但是我正在寻找更好的东西。 理想情况下,它应该是自我调整的,以便它在不同的计算机上工作最佳(例如,当RAID和/或NCQ可用时发出更多请求等)。 我不认为它可能基于找到硬件配置,但测量处理速度和基于它的优化应该是可能的。 任何想法?
sequentialOperation以固定顺序执行,而不是先处理可用的结果。 我该怎么改变它?
这正是CompletionService的作用:它并行处理任务并在完成任务时返回它们,而不管提交顺序如何。
简化(未测试)示例:
int NUM_THREADS = Runtime.getRuntime().availableProcessors(); ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); CompletionService completionService = new ExecutorCompletionService (executor); for (File f : files) futures.add(completionService.submit(new MyCallable(f))); for(int i = 0; i < futures.size(); i++) { Future next = completionService.take(); sequentialOperation(next.get()); }
有数千个文件需要处理,启动数千个磁盘请求可能导致磁盘丢失。 通过使用Executors.newFixedThreadPool(10)我限制了这个数字,但是我正在寻找更好的东西。
我不是百分百肯定那个。 我想这取决于你有多少磁盘,但我认为磁盘访问部分不应该分成太多的线程(每个磁盘一个线程可能是明智的):如果许multithreading同时访问一个磁盘,它将花费更多的时间寻求而不是阅读。
sequentialOperation以固定顺序执行,而不是先处理可用的结果。 我该怎么改变它?
假设:每个someSlowOperation(file);
调用将花费不同的时间,因此,您希望在收到MyFileData
立即处理,而不是与另一个sequentialOperation
。
您可以通过设置生产者/消费者队列来实现此目的。
生成器是您在示例中执行的callables
,添加的位用于将结果添加到等待处理的工作队列中。
Consumer是sequentialOperation()
调用 – 它在自己的线程中运行,只有一个。 所有这个线程都是占用队列的头部,并处理它,重复直到程序结束。
这样,您可以最大限度地利用计算机上的所有资源。
带有一些示例代码的相关post: 使用队列的生产者/消费者线程
编辑:我想你可能想要一个快速的样本,因为它对以前从未做过的人来说非常不透明
public class Main { private final ExecutorService producerExecutor = Executors.newFixedThreadPool(10); private final ExecutorService consumerExecutor = Executors.newFixedThreadPool(1); private final LinkedBlockingQueue queue = new LinkedBlockingQueue();//or some other impl abstract class Producer implements Runnable{ private final File file; Producer(File file) { this.file = file; } public void run() { MyData result = someLongAssOperation(file); queue.offer(result); } public abstract void someLongAssOperation(File file); } abstract class Consumer implements Runnable { public void run() { while (true) { sequentialOperation(queue.take()); } } public abstract void sequentialOperation(MyData data); } private void start() { consumerExecutor.submit(new Consumer(){ //implement sequentialOperation here }); for (File f : files) { producerExecutor.submit(new Producer(file) { //implement the someLongAssOperation() }); } } public static void main(String[] args) { new Main().start(); } }