当高吞吐量(3GB / s)文件系统可用时,如何使用Java中的多个线程读取文件

据我所知,对于普通的Spindle Drive系统,使用multithreading读取文件效率很低。

这是一个不同的情况,我有一个高吞吐量文件系统可供我使用,它提供高达3GB / s的读取速度,具有196个CPU内核和2TB RAM

单线程Java程序以最大85-100 MB / s的速度读取文件,因此我有可能比单线程更好。 我必须读取大小为1TB的文件,并且我有足够的RAM来加载它。

目前我使用以下或类似的东西,但需要用multithreading编写一些东西以获得更好的吞吐量:

Java 7文件:50 MB / s

List lines = Files.readAllLines(Paths.get(path), encoding); 

Java commons-io:48 MB / s

 List lines = FileUtils.readLines(new File("/path/to/file.txt"), "utf-8"); 

与番石榴相同:45 MB / s

 List lines = Files.readLines(new File("/path/to/file.txt"), Charset.forName("utf-8")); 

Java Scanner类:非常慢

 Scanner s = new Scanner(new File("filepath")); ArrayList list = new ArrayList(); while (s.hasNext()){ list.add(s.next()); } s.close(); 

我希望能够以正确的排序顺序加载文件并尽可能快地构建相同的ArrayList。

还有一个问题看起来类似,但它实际上是不同的,因为:问题是讨论multithreading文件I / O在物理上不可能有效的系统,但由于技术进步,我们现在有系统旨在支持高吞吐量I / O,因此限制因素是CPU / SW,可以通过multithreadingI / O来克服。

另一个问题没有回答如何将代码写入multithreadingI / O.

以下是使用多个线程读取单个文件的解决方案。

将文件分成N个块,读取线程中的每个块,然后按顺序合并它们。 注意跨越块边界的线。 这是用户slaks建议的基本思想

在单个20 GB文件的multithreading实现下面的基准标记:

1线程:50秒:400 MB / s

2个线程:30秒:666 MB / s

4个线程:20秒:1GB / s

8个线程:60秒:333 MB / s

等效Java7 readAllLines():400秒:50 MB / s

注意:这可能仅适用于旨在支持高吞吐量I / O的系统,而不适用于通常的个人计算机

 package filereadtests; import java.io.*; import static java.lang.Math.toIntExact; import java.nio.*; import java.nio.channels.*; import java.nio.charset.Charset; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FileRead implements Runnable { private FileChannel _channel; private long _startLocation; private int _size; int _sequence_number; public FileRead(long loc, int size, FileChannel chnl, int sequence) { _startLocation = loc; _size = size; _channel = chnl; _sequence_number = sequence; } @Override public void run() { try { System.out.println("Reading the channel: " + _startLocation + ":" + _size); //allocate memory ByteBuffer buff = ByteBuffer.allocate(_size); //Read file chunk to RAM _channel.read(buff, _startLocation); //chunk to String String string_chunk = new String(buff.array(), Charset.forName("UTF-8")); System.out.println("Done Reading the channel: " + _startLocation + ":" + _size); } catch (Exception e) { e.printStackTrace(); } } //args[0] is path to read file //args[1] is the size of thread pool; Need to try different values to fing sweet spot public static void main(String[] args) throws Exception { FileInputStream fileInputStream = new FileInputStream(args[0]); FileChannel channel = fileInputStream.getChannel(); long remaining_size = channel.size(); //get the total number of bytes in the file long chunk_size = remaining_size / Integer.parseInt(args[1]); //file_size/threads //Max allocation size allowed is ~2GB if (chunk_size > (Integer.MAX_VALUE - 5)) { chunk_size = (Integer.MAX_VALUE - 5); } //thread pool ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1])); long start_loc = 0;//file pointer int i = 0; //loop counter while (remaining_size >= chunk_size) { //launches a new thread executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i)); remaining_size = remaining_size - chunk_size; start_loc = start_loc + chunk_size; i++; } //load the last remaining piece executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i)); //Tear Down executor.shutdown(); //Wait for all threads to finish while (!executor.isTerminated()) { //wait for infinity time } System.out.println("Finished all threads"); fileInputStream.close(); } } 

您应该首先尝试java 7 Files.readAllLines:

 List lines = Files.readAllLines(Paths.get(path), encoding); 

使用multithreading方法可能不是一个好选择,因为它会强制文件系统执行随机读取(这在文件系统上永远不是一件好事)