通过多个线程访问文件

我想通过10个线程访问一个大文件(文件大小可能从30 MB到1 GB),然后处理文件中的每一行,并通过10个线程将它们写入另一个文件。 如果我只使用一个线程来访问IO,则阻止其他线程。 处理花费一些时间几乎相当于从文件系统中读取一行代码。 还有一个约束,输出文件中的数据应与输入文件的数据顺序相同。

我希望你对这个系统的设计有所了解。 是否有任何现有的API支持并发访问文件?

写入同一文件也可能导致死锁。

如果我关心时间限制,请建议如何实现这一点。

  • 你应该从文件阅读中 抽象出来。 创建一个读取文件的类,并将内容分派给不同数量的线程。

该类不应该调度字符串,它应该将它们包装在包含元信息Line类中,例如行号 ,因为您要保留原始序列。

  • 您需要一个处理类 ,它可以对收集的数据进行实际操作 。 在你的情况下,没有工作要做。 该类只存储信息,您可以在某一天扩展它以执行其他操作(例如,反转字符串。附加一些其他字符串,…)

  • 然后你需要一个merger类 ,它在处理线程上进行某种多路合并排序 ,并按顺序 收集对 Line 实例的 所有引用

合并类也可以将数据写回文件,但要保持代码清洁……

  • 我建议创建一个输出类 ,它再次从所有文件处理和东西中抽象出来。

当然,如果你缺少主内存,你需要很多内存。 您需要一种基于流的方法 ,这种方法可以在内部工作,以减少内存开销。


更新基于流的方法

除了以下情况外,保持不变:

Reader线程将读取的数据泵入Balloon 。 此气球可以容纳一定数量的Line实例(数字越大,消耗的主内存越多)。

处理线程从气球中取出Line s,读取器将更多的线条泵入气球,因为它变得更空。

merger类从上面的处理线程获取行,并且writer将数据写回文件。

也许你应该在I / O线程中使用FileChannel ,因为它更适合读取大文件,并且在处理文件时可能消耗更少的内存(但这只是估计的猜测)。

我会从三个线程开始。

  1. 读取数据的读取器线程,将其分成“行”并将它们放入有界阻塞队列(Q1),
  2. 从Q1读取的处理线程,进行处理并将它们放入第二个有界阻塞队列(Q2),和
  3. 从Q2读取并写入磁盘的写入程序线程。

当然,我还要确保输出文件与输入文件在物理上不同的磁盘上。

如果处理速度往往比I / O(监视队列大小)慢,那么您可以开始尝试两个或更多并行“处理器”,这些“处理器”在读取和写入数据方面是同步的。

任何类型的IO,无论是磁盘,网络等,通常都是瓶颈。

通过使用多个线程,您正在加剧问题,因为很可能一次只有一个线程可以访问IO资源。

最好使用一个线程来读取,将信息传递给工作线程池,然后直接从那里写入。 但是,如果工人们写到同一个地方会再次出现瓶颈,因为只有一个人可以拥有锁。 通过将数据传递给单个编写器线程轻松修复。

简而言之”:

单个读取器线程写入BlockingQueue等,这给它一个自然有序的序列。

然后工作线程线程在队列上等待数据,记录其序列号。

然后,工作线程将处理后的数据写入另一个BlockingQueue,这次附加其原始序列号

写入程序线程可以获取数据并按顺序写入。

这可能会产生最快的实施。

您可以使用java中的FileChannel执行此操作,该文件允许多个线程访问同一文件。 FileChannel允许您从一个位置开始读写。 请参阅以下示例代码:

 import java.io.*; import java.nio.*; import java.nio.channels.*; public class OpenFile implements Runnable { private FileChannel _channel; private FileChannel _writeChannel; private int _startLocation; private int _size; public OpenFile(int loc, int sz, FileChannel chnl, FileChannel write) { _startLocation = loc; _size = sz; _channel = chnl; _writeChannel = write; } public void run() { try { System.out.println("Reading the channel: " + _startLocation + ":" + _size); ByteBuffer buff = ByteBuffer.allocate(_size); if (_startLocation == 0) Thread.sleep(100); _channel.read(buff, _startLocation); ByteBuffer wbuff = ByteBuffer.wrap(buff.array()); int written = _writeChannel.write(wbuff, _startLocation); System.out.println("Read the channel: " + buff + ":" + new String(buff.array()) + ":Written:" + written); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { FileOutputStream ostr = new FileOutputStream("OutBigFile.dat"); FileInputStream str = new FileInputStream("BigFile.dat"); String b = "Is this written"; //ostr.write(b.getBytes()); FileChannel chnl = str.getChannel(); FileChannel write = ostr.getChannel(); ByteBuffer buff = ByteBuffer.wrap(b.getBytes()); write.write(buff); Thread t1 = new Thread(new OpenFile(0, 10000, chnl, write)); Thread t2 = new Thread(new OpenFile(10000, 10000, chnl, write)); Thread t3 = new Thread(new OpenFile(20000, 10000, chnl, write)); t1.start(); t2.start(); t3.start(); t1.join(); t2.join(); t3.join(); write.force(false); str.close(); ostr.close(); } } 

在此示例中,有三个线程读取同一文件并写入同一文件并且不冲突。 此示例中的此逻辑未考虑所分配的大小不必以行结束等结束。您将根据数据找到正确的逻辑。

我以前遇到过类似的情况,我处理它的方式是这样的:

逐行读取主​​线程中的文件,并将该行的处理提交给执行程序。 ExecutorService的合理起点就在这里。 如果您计划使用固定的no线程,则可能对Executors类中的Executors.newFixedThreadPool(10)工厂方法感兴趣。 关于这个话题的javadoc也不错。

基本上,我提交所有作业,调用shutdown然后在主线程中继续按顺序写入输出文件,以便返回所有Future 。 你可以利用Future类的’ get()方法的阻塞性来确保顺序,但你真的不应该使用multithreading来编写,就像你不会用它来阅读一样。 说得通?

但是, 1 GB数据文件? 如果我是你,我首先会对有意义地分解这些文件感兴趣。

PS :我故意避免在答案中使用代码,因为我希望OP自己尝试一下。 已经提供了足够的指向特定类,API方法和示例的指针。

请注意,理想的线程数受硬件架构和其他内容的限制(您可以考虑咨询线程池以计算最佳线程数)。 假设“10”是一个好数字,我们继续。 =)

如果您正在寻找性能,可以执行以下操作:

  • 使用您拥有的线程读取文件,并根据您的业务规则处理每个线程。 保留一个控件变量,指示要在输出文件中插入的下一个预期行。

  • 如果下一个预期的行已完成处理,请将其附加到缓冲区(队列)(如果您能找到直接在输出文件中插入的方法,那将是理想的,但您会遇到锁定问题)。 否则,将此“future”行存储在二元搜索树中,按行排序。 二进制搜索树为搜索和插入提供了“O(log n)”的时间复杂度,这对于您的上下文来说非常快。 继续填充树,直到下一个“预期”行完成处理。

激活将负责打开输出文件的线程,定期使用缓冲区并将行写入文件。

另外,跟踪要插入文件中的BST的“次要”预期节点。 在开始搜索之前,您可以使用它来检查未来的行是否在BST内。

  • 当下一个预期的行完成处理时,插入到Queue中并validation下一个元素是否在binary-search-tree中。 如果下一行在树中,则从树中删除节点并将节点的内容附加到队列,如果下一行已经在树内,则重复搜索。
  • 重复此过程,直到所有文件都处理完毕,树为空并且队列为空。

这种方法使用-O(n)来读取文件(但是并行化) – O(1)将有序行插入到队列-O(Logn)* 2中以读取和写入二进制搜索树-O( n)编写新文件

加上业务规则和I / O操作的成本。

希望能帮助到你。

其中一种可能的方法是创建一个单独的线程来读取输入文件并将读取行放入阻塞队列。 多个线程将等待来自此队列的数据,处理数据。

另一种可能的解决方案可能是将文件分成块并将每个块分配给单独的线程。

为避免阻塞,您可以使用异步IO。 您还可以从面向模式的软件架构第2卷看一下Proactor模式

想到Spring Batch 。

维护订单需要一个后期处理步骤,即将读取的索引/键存储在处理上下文中。处理逻辑也应该将处理后的信息存储在上下文中。一旦完成处理,您就可以对列表进行后处理并写入文件。

但请注意OOM问题。

由于订单需要维护,所以问题本身就说读写不能并行,因为它是顺序进程,你可以并行做的唯一事情就是处理记录但是只用一个写入器也解决不了多少。

这是一个设计方案:

  1. 使用One Thread t1读取文件并将数据存储到LinkedBlockingQueue Q1中
  2. 使用另一个线程t2从Q1读取数据并放入另一个LinkedBlockingQueue Q2
  3. 线程t3从Q2读取数据并写入文件。
  4. 为了确保您不会遇到OutofMemoryError,您应该初始化具有适当大小的队列
  5. 您可以使用CyclicBarrier来确保所有线程完成其操作
  6. 此外,您可以在CyclicBarrier中设置Action,您可以在其中执行后期处理任务。

祝你好运,希望你能得到最好的设计。

干杯!!

我过去遇到过类似的问题。 我必须从单个文件中读取数据,处理它并将结果写入其他文件。 由于加工部分非常重。 所以我试图使用多个线程。 这是我为解决我的问题而遵循的设计:

  • 使用主程序作为主程序,一次读取整个文件(但不要开始处理)。 使用序列顺序为每一行创建一个数据对象。
  • 在main中使用一个priorityblockingqueue say queue,将这些数据对象添加到其中。 在每个线程的构造函数中共享此队列的引用。
  • 创建不同的处理单元,即将侦听此队列的线程。 当我们将数据对象添加到此队列时,我们将调用notifyall方法。 所有线程都将单独处理。
  • 处理完毕后,将所有结果放在单个映射中,并将结果作为序列号放入。
  • 当队列为空且所有线程都空闲时,意味着处理完成。 停止线程。 迭代地图并将结果写入文件