使用JAVA中的multithreading(生产者消费者模型)读取和写入文件

我被困在这里,有人可以解释为什么消费者线程在下面的代码中运行先前的生产者线程。 当生产者没有放置任何内容时,消费者线程如何运行。 程序错了吗?

实现: –为从给定文件夹中拾取的每个文件运行生成消费者线程。

例如,如果指定的文件夹有3个,那么每个文件必须启动2个线程(生产者/消费者),这使得线程数为6。

import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.List; import java.util.Vector; class sharedInt { private int syncUponInt; private boolean available = false; private File processingFile; private static File[] listOfFile; sharedInt(File[] totalList) { listOfFile = totalList; } public int getTotalCount() { return listOfFile.length; } public static File[] getListOfFile() { return listOfFile; } public static void setListOfFile(File[] listOfFile) { sharedInt.listOfFile = listOfFile; } public File getProcessingFile() { return processingFile; } public void setProcessingFile(File processingFile) { this.processingFile = processingFile; } public synchronized int getContents() { while (available == false) { try { wait(); } catch (InterruptedException e) { } } available = false; notify(); return syncUponInt; } public synchronized void setContents(int value) { while (available == true) { try { wait(); } catch (InterruptedException e) { } } syncUponInt = value; available = true; notify(); } } class Producer1 extends Thread { private sharedInt cubbyhole; private int number; public Producer1(sharedInt c, int number) { cubbyhole = c; this.number = number; } public void run() { // for (int i = 0; i < cubbyhole.getTotalCount(); i++) { cubbyhole.setContents(this.number); Vector vectorList = new Vector(); System.out.println("Producer " + this.currentThread() + "put: " + this.number + "processing file is" + cubbyhole.getProcessingFile()); RandomAccessFile raf = null; try { raf = new RandomAccessFile(cubbyhole.getProcessingFile(), "r"); StringBuffer sb = new StringBuffer(); String line = null; while ((line = raf.readLine()) != null) { sb.append(line); } vectorList.add(sb.toString()); System.out.println(sb.toString()); } catch (FileNotFoundException e) { } catch (IOException e) { } // } } } class Consumer1 extends Thread { private sharedInt cubbyhole; public Consumer1(sharedInt c) { cubbyhole = c; } public void run() { int value = 0; // for (int i = 0; i < cubbyhole.getTotalCount(); i++) { System.out.println("Consumer " + this.currentThread() + "got: " + cubbyhole.getContents() + "processing file is" + cubbyhole.getProcessingFile()); } } public class FileManagementApp { public static void main(String[] args) throws InterruptedException { System.out.println("1. Please enter the path of the ..."); // Scanner scn = new Scanner(System.in); // String folderPath = scn.nextLine(); File folder = new File("C:\\file\\output"); File[] fileList = folder.listFiles(); int countOfFiles = fileList.length; sharedInt c = new sharedInt(fileList); Producer1 p1 = null; List pList = new ArrayList(); Consumer1 c1 = null; List cList = new ArrayList(); for (int i = 0; i < countOfFiles; i++) { c = new sharedInt(fileList); c.setProcessingFile(fileList[i]); p1 = new Producer1(c, i); p1.setName("Producer--" + i); pList.add(p1); c1 = new Consumer1(c); c1.setName("Consumer--" + i); cList.add(c1); pList.get(i).start(); cList.get(i).start(); } } } 

输出: –

 1. Please enter the path of the ... Consumer Thread[Consumer--0,5,main]got: 0processing file isC:\file\output\0.A.txt Producer Thread[Producer--0,5,main]put: 0processing file isC:\file\output\0.A.txt Producer Thread[Producer--1,5,main]put: 1processing file isC:\file\output\1.A.txt Producer Thread[Producer--2,5,main]put: 2processing file isC:\file\output\2.A.txt Consumer Thread[Consumer--1,5,main]got: 1processing file isC:\file\output\1.A.txt fg abc Consumer Thread[Consumer--2,5,main]got: 2processing file isC:\file\output\2.A.txt de 

编辑:-

将代码修改为类似的东西,并且能够使用生产者消费者模型同时实现并发/multithreading读取和写入文件。

 import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Vector; class SharedInteger { private boolean available = false; public File processingFile; public long totalNoOfSplits; public Vector vectorBytes; private File[] listOfFiles; SharedInteger(File[] totalList) { listOfFiles = totalList; } public synchronized Vector get() { while (available == false) { try { wait(); } catch (InterruptedException e) { } } available = false; notify(); return vectorBytes; } public synchronized void put(Vector value) { while (available == true) { try { wait(); } catch (InterruptedException e) { } } vectorBytes = value; available = true; notify(); } } class Producer extends Thread { private SharedInteger sharedInteger; public Producer(SharedInteger c) { sharedInteger = c; } public void run() { FileInputStream fis = null; Vector vectorBytes = new Vector(); try { fis = new FileInputStream(sharedInteger.processingFile); while (fis.available() != 0) { vectorBytes.add((byte) fis.read()); } sharedInteger.put(vectorBytes); } catch (Exception e) { } } } class Consumer extends Thread { private SharedInteger sharedInteger; private FileOutputStream fos; public Consumer(SharedInteger c) { sharedInteger = c; } public void run() { File newFile = sharedInteger.processingFile; try { fos = new FileOutputStream(newFile.getParentFile()+"1\\"+newFile.getName()); } catch (FileNotFoundException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } Vector v = sharedInteger.get(); try { if (null != v) { writeToAFile(v); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void writeToAFile(Vector contents) throws IOException { for (int i = 0; i < contents.size(); i++) { System.out.println(Thread.currentThread()); fos.write(contents.get(i)); fos.flush(); } } } public class ProducerConsumerTest { public static void main(String[] args) throws FileNotFoundException { File folder = new File("C:\\file\\output"); File[] fileList = folder.listFiles(); int countOfFiles = fileList.length; SharedInteger c = new SharedInteger(fileList); List pList = new ArrayList(); List cList = new ArrayList(); Producer p1 = null; Consumer c1 = null; for (int i = 0; i < countOfFiles; i++) { c = new SharedInteger(fileList); c.processingFile = fileList[i]; p1 = new Producer(c); p1.setName("Producer--" + i); pList.add(p1); pList.get(i).start(); c1 = new Consumer(c); c1.setName("Consumer--" + i); cList.add(c1); cList.get(i).start(); } } } 

好吧,有一些东西很可疑。 但是,看看你的run()方法:

 // Producer1 public void run() { cubbyhole.setContents(this.number); Vector vectorList = new Vector(); System.out.println("Producer " + this.currentThread() + "put: " + this.number + "processing file is" + cubbyhole.getProcessingFile()); RandomAccessFile raf = null; try { // ... } // Consumer1 public void run() { int value = 0; System.out.println("Consumer " + this.currentThread() + "got: " + cubbyhole.getContents() + "processing file is" + cubbyhole.getProcessingFile()); } 

一旦您的生产者调用setContents(int) (因此也是notify() ),您的消费者就可以继续。 仅仅因为你首先看到消费者的控制台输出并不意味着什么。 打印是在没有同步的情况下同时完成的,因此您不能依赖于执行顺序。

编辑:根据您的要求使用Vectorwait()notifiy()和每个文件两个线程,但请记住,有更好的方法来实现它(请参阅注释):

 public class FileMerger { private volatile int currentWriterId = 0; public static void main(String[] args) throws Exception { // 1st argument: target directory. File directory = new File(args[0]); // 2nd argument: merge files suffix. FilenameFilter filter = (dir, name) -> name.endsWith("." + args[1]); new FileMerger().merge(directory, filter); } public void merge(File directory, FilenameFilter filter) throws IOException, InterruptedException { File[] files = directory.listFiles(filter); int numberOfFiles = files.length; Path mergeFilePath = Paths.get(directory.getPath() + FileSystems.getDefault().getSeparator() + "merge.txt"); Vector fileContents = new Vector<>(Collections.nCopies(numberOfFiles, null)); Files.createFile(mergeFilePath); for (int i = 0; i < numberOfFiles; i++) { final int writerId = i; File file = files[i]; CountDownLatch readWriteLatch = new CountDownLatch(1); // Reader. new Thread(() -> { try { List lines = Files.readAllLines(Paths.get(file.getPath())); String content = String.join("\n", lines); fileContents.set(writerId, content); readWriteLatch.countDown(); } catch (IOException e) { /* NOP */ } }).start(); // Writer. new Thread(() -> { try { // Wait for corresponding reader to set content. readWriteLatch.await(); // Wait for writer ID. synchronized (this) { while (currentWriterId != writerId) { wait(); } Files.write(mergeFilePath, (fileContents.get(writerId) + "\n").getBytes(), StandardOpenOption.APPEND); currentWriterId++; notifyAll(); } } catch (InterruptedException | IOException e) { /* NOP */ } }).start(); } } } 
 import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Vector; class SharedInteger { private boolean available = false; public File processingFile; public long totalNoOfSplits; public Vector vectorBytes; private File[] listOfFiles; SharedInteger(File[] totalList) { listOfFiles = totalList; } public synchronized Vector get() { while (available == false) { try { wait(); } catch (InterruptedException e) { } } available = false; notify(); return vectorBytes; } public synchronized void put(Vector value) { while (available == true) { try { wait(); } catch (InterruptedException e) { } } vectorBytes = value; available = true; notify(); } } class Producer extends Thread { private SharedInteger sharedInteger; public Producer(SharedInteger c) { sharedInteger = c; } public void run() { FileInputStream fis = null; Vector vectorBytes = new Vector(); try { fis = new FileInputStream(sharedInteger.processingFile); while (fis.available() != 0) { vectorBytes.add((byte) fis.read()); } sharedInteger.put(vectorBytes); } catch (Exception e) { } } } class Consumer extends Thread { private SharedInteger sharedInteger; private FileOutputStream fos; public Consumer(SharedInteger c) { sharedInteger = c; } public void run() { File newFile = sharedInteger.processingFile; try { fos = new FileOutputStream(newFile.getParentFile()+"1\\"+newFile.getName()); } catch (FileNotFoundException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } Vector v = sharedInteger.get(); try { if (null != v) { writeToAFile(v); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void writeToAFile(Vector contents) throws IOException { for (int i = 0; i < contents.size(); i++) { System.out.println(Thread.currentThread()); fos.write(contents.get(i)); fos.flush(); } } } public class ProducerConsumerTest { public static void main(String[] args) throws FileNotFoundException { File folder = new File("C:\\file\\output"); File[] fileList = folder.listFiles(); int countOfFiles = fileList.length; SharedInteger c = new SharedInteger(fileList); List pList = new ArrayList(); List cList = new ArrayList(); Producer p1 = null; Consumer c1 = null; for (int i = 0; i < countOfFiles; i++) { c = new SharedInteger(fileList); c.processingFile = fileList[i]; p1 = new Producer(c); p1.setName("Producer--" + i); pList.add(p1); pList.get(i).start(); c1 = new Consumer(c); c1.setName("Consumer--" + i); cList.add(c1); cList.get(i).start(); } } }