从大文件中逐个读取30Million用户ID

我正在尝试使用Java读取一个非常大的文件。 那个大文件会有这样的数据,这意味着每一行都有一个用户ID。

149905320 1165665384 66969324 886633368 1145241312 286585320 1008665352 

在那个大文件中,将有大约3000万用户ID。 现在我试图从那个大文件中一个一个地读取所有用户id。 意味着每个用户ID应该只从该大文件中选择一次。 例如,如果我有30万个用户ID,那么它应该使用multithreading代码只打印一次3000万用户ID。

下面是我的代码,它是一个运行10个线程的multithreading代码,但是使用下面的程序,我无法确保每个用户ID只被选中一次。

 public class ReadingFile { public static void main(String[] args) { // create thread pool with given size ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { service.submit(new FileTask()); } } } class FileTask implements Runnable { @Override public void run() { BufferedReader br = null; try { br = new BufferedReader(new FileReader("D:/abc.txt")); String line; while ((line = br.readLine()) != null) { System.out.println(line); //do things with line } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } } } 

任何人都可以帮我吗? 我做错了什么? 什么是最快的方法呢?

你真的无法改进让一个线程按顺序读取文件,假设你没有做过像在多个磁盘上条带化文件那样的事情。 使用一个线程,您可以执行一次搜索,然后执行一次长读取; 对于多个线程,您将获得线程导致多次搜索,因为每个线程都获得对磁盘头的控制。

编辑:这是一种并行化处理的方法,同时仍然使用串行I / O来读取行。 它使用BlockingQueue在线程之间进行通信; FileTask向队列添加行, CPUTask读取它们并处理它们。 这是一个线程安全的数据结构,因此无需向其添加任何同步。 您正在使用put(E e)将字符串添加到队列中,因此如果队列已满(它可以容纳200个字符串,如ReadingFile中的声明中所定义), FileTask阻塞直到空间释放; 同样,您正在使用take()从队列中删除项目,因此CPUTask将阻塞直到项目可用。

 public class ReadingFile { public static void main(String[] args) { final int threadCount = 10; // BlockingQueue with a capacity of 200 BlockingQueue queue = new ArrayBlockingQueue<>(200); // create thread pool with given size ExecutorService service = Executors.newFixedThreadPool(threadCount); for (int i = 0; i < (threadCount - 1); i++) { service.submit(new CPUTask(queue)); } // Wait til FileTask completes service.submit(new FileTask(queue)).get(); service.shutdownNow(); // interrupt CPUTasks // Wait til CPUTasks terminate service.awaitTermination(365, TimeUnit.DAYS); } } class FileTask implements Runnable { private final BlockingQueue queue; public FileTask(BlockingQueue queue) { this.queue = queue; } @Override public void run() { BufferedReader br = null; try { br = new BufferedReader(new FileReader("D:/abc.txt")); String line; while ((line = br.readLine()) != null) { // block if the queue is full queue.put(line); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } } } class CPUTask implements Runnable { private final BlockingQueue queue; public CPUTask(BlockingQueue queue) { this.queue = queue; } @Override public void run() { String line; while(true) { try { // block if the queue is empty line = queue.take(); // do things with line } catch (InterruptedException ex) { break; // FileTask has completed } } // poll() returns null if the queue is empty while((line = queue.poll()) != null) { // do things with line; } } } 

我们讨论的是平均315 MB的文件,其中的行由新行分隔。 我认为这很容易融入记忆中。 暗示用户名中没有特定顺序必须保存。 所以我建议使用以下算法:

  • 获取文件长度
  • 将每个文件的第10个复制到一个字节缓冲区(二进制副本应该很快)
  • 启动一个线程来处理每个缓冲区
  • 每个线程处理其区域中除第一个和最后一个之外的所有行。
  • 完成后,每个线程必须返回其数据中的第一个和最后一个partitial行,
  • 每个线程的“最后一个”必须与处理下一个文件块的“第一个”重新组合,因为您可能已经切断了一行。 然后必须处理这些令牌。

1.7中引入的Fork Join API非常适合这个用例。 查看http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html 。 如果你搜索,你会发现很多例子。