目录正在查看java中的更改

我正在使用WatchService来监视目录中的更改,特别是在目录中创建新文件。 以下是我的代码 –

package watcher; import java.nio.file.*; import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; import static java.nio.file.StandardWatchEventKinds.OVERFLOW; import java.io.*; public class Watch { public static void main(String[] args) throws IOException { Path dir = Paths.get("c:\\mk\\"); WatchService service = FileSystems.getDefault().newWatchService(); WatchKey key = dir.register(service, ENTRY_CREATE); System.out.println("Watching directory: "+dir.toString()); for(;;){ WatchKey key1; try { key1 = service.take(); } catch (InterruptedException x) { break; } for (WatchEvent event: key1.pollEvents()) { WatchEvent.Kind kind = event.kind(); if (kind == OVERFLOW) { continue; } WatchEvent ev = (WatchEvent)event; Path filename = ev.context(); Path child = dir.resolve(filename); System.out.println("New file: "+child.toString()+" created."); try{ FileInputStream in = new FileInputStream(child.toFile()); System.out.println("File opened for reading"); in.close(); System.out.println("File Closed"); }catch(Exception x){ x.printStackTrace(); } } boolean valid = key.reset(); if (!valid) { break; } } } } 

当我在“mk”目录中创建文件时,我收到通知。 但是当我在这个目录中复制一些文件时,我在打开复制文件时遇到exception。

我的猜测是Windows Copier对话框仍然锁定了该文件,我无法打开该文件。 所以基本上我想知道的是如何获得通知文件已被其他进程关闭。

以上代码的输出如下 –

 Watching directory: c:\mk New file: c:\mk\New Text Document (2).txt created. File opened for reading File Closed New file: c:\mk\Config.class created. java.io.FileNotFoundException: c:\mk\Config.class (The process cannot access the file because it is being used by another process) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.(FileInputStream.java:138) at watcher.Watch.main(Watch.java:36) New file: c:\mk\New Text Document (3).txt created. File opened for reading File Closed 

文件“ New Text Document (2).txt ”和“ New Text Document (3).txt ”我已创建但文件“ Config.class ”我已从其他目录复制。

请帮帮我。

我通过实现算法得到了这个工作:Watcher线程将文件名放在BlockingQueue中,其他线程将轮询此队列,获取文件名,尝试几次打开文件。 如果文件被打开,Windows Copier已发布文件锁定,我们可以继续。 因此,当其他线程发现文件已被解锁时,其他线程会将此文件名放入已处理的队列中,我的应用程序将从该位置检索文件名。 另外一个线程在通过打开文件检查文件解锁时,如果它运行很长时间解锁文件,我们可以将这个文件名放回BlockingQueue并处理其他文件名,前者可以在以后处理。

解决方案:希望这可能有助于其他:

 package dirwatch; import java.nio.file.*; import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; import static java.nio.file.StandardWatchEventKinds.OVERFLOW; import static java.nio.file.LinkOption.*; import java.nio.file.attribute.*; import java.io.*; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class WatchDir { private final WatchService watcher; private final Map keys; private final boolean recursive; private boolean trace = false; private BlockingQueue fileProcessingQueue; //******* processedFileQueue **** will be used by other threads to retrive unlocked files.. so I have // kept as public final public final BlockingQueue processedFileQueue; private volatile boolean closeProcessingThread; private volatile boolean closeWatcherThread; private void processFiles(){ System.out.println("DirWatchProcessingThread Started"); String fileName; outerLoop: while(!closeProcessingThread || !fileProcessingQueue.isEmpty()){ try{ fileName = fileProcessingQueue.poll(1000, TimeUnit.MILLISECONDS); }catch(InterruptedException ie){ fileName = null; } if(fileName == null || fileName.equals("")){ continue outerLoop; } long startTime = System.currentTimeMillis(); innerLoop: while(true){ FileInputStream fis = null; File file = new File(fileName); try{ fis = new FileInputStream(fileName); break innerLoop; }catch(FileNotFoundException fnfe){ if(!file.exists() || file.isDirectory()){ System.out.println("File: '"+fileName+"has been deleted in file system or it is not file. Not processing this file."); continue outerLoop; } try{ Thread.sleep(WatchDirParameters.millisToPuaseForFileLock); }catch(InterruptedException ie){ } if((System.currentTimeMillis() - startTime) > WatchDirParameters.millisToSwapFileForUnlocking){ if(fileProcessingQueue.offer(fileName)){ continue outerLoop; }else{ startTime = System.currentTimeMillis(); continue innerLoop; } } }finally{ if(fis != null){ try{ fis.close(); }catch(IOException ioe){ ioe.printStackTrace(); } } } } System.out.println("Queuing File: "+fileName); processedLoop:while(true){ try{ if(processedFileQueue.offer(fileName, 1000, TimeUnit.MILLISECONDS)){ break processedLoop; } }catch(InterruptedException ie){ //ie.printStackTrace(); } } } closeWatcherThread = true; closeProcessingThread = true; System.out.println("DirWatchProcessingThread Exited"); } /** * Process all events for keys queued to the watcher */ private void processEvents(){ System.out.println("DirWatcherThread started."); while(!closeWatcherThread) { // wait for key to be signalled WatchKey key; try { key = watcher.take(); } catch (InterruptedException x) { // if we are returning from these method, it means we no longer wants to watch directory // we must close thread which may be waiting for file names in queue continue; }catch(ClosedWatchServiceException cwse){ break; } Path dir = keys.get(key); if (dir == null) { System.err.println("WatchKey not recognized!!"); continue; } try{ for (WatchEvent event: key.pollEvents()) { WatchEvent.Kind kind = event.kind(); if (kind == OVERFLOW) { continue; } // Context for directory entry event is the file name of entry WatchEvent ev = cast(event); Path name = ev.context(); Path child = dir.resolve(name); if(kind.equals(ENTRY_CREATE)){ // if directory is created, and watching recursively, then // register it and its sub-directories if (recursive) { try { if (Files.isDirectory(child, NOFOLLOW_LINKS)) { registerAll(child); continue; } } catch (IOException x) { // ignore to keep sample readbale } } while(true){ if(fileProcessingQueue.remainingCapacity() < 2){ // if only one last can be inserted then don't queue this we need 1 empty space in queue // for swaping file names.. // sleep for some time so processing thread may have made some rooms to queue in fileQueue // this logic will not create any problems as only one this thread is inserting in queue try{ Thread.sleep(200); }catch(InterruptedException ie){ } continue; } if(!fileProcessingQueue.offer(child.toString())){ // couldn't queue this element by whatever reason.. we will try to enqueue again by continuing loop continue; }else{ // file name has been queued in queue break; } } } } // reset key and remove from set if directory no longer accessible boolean valid = key.reset(); if (!valid) { keys.remove(key); // all directories are inaccessible if (keys.isEmpty()) { break; } } }catch(ClosedWatchServiceException cwse){ break; } } closeProcessingThread = true; closeWatcherThread = true; System.out.println("DirWatcherThread exited."); } public void stopWatching(){ try{ watcher.close(); }catch(IOException ioe){ } closeProcessingThread = true; closeWatcherThread = true; } public static WatchDir watchDirectory(String dirName, boolean recursive) throws InvalidPathException, IOException, Exception{ try{ Path dir = Paths.get(dirName); final WatchDir watchDir = new WatchDir(dir, recursive); watchDir.closeProcessingThread = false; watchDir.closeWatcherThread = false; new Thread(new Runnable() { public void run() { watchDir.processFiles(); } }, "DirWatchProcessingThread").start(); new Thread(new Runnable() { public void run() { watchDir.processEvents(); } }, "DirWatcherThread").start(); return watchDir; }catch(InvalidPathException ipe){ throw ipe; }catch(IOException ioe){ throw ioe; }catch(Exception e){ throw e; } } @SuppressWarnings("unchecked") private static  WatchEvent cast(WatchEvent event) { return (WatchEvent)event; } /** * Register the given directory with the WatchService */ private void register(Path dir) throws IOException { //WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); WatchKey key = dir.register(watcher, ENTRY_CREATE); if (trace) { Path prev = keys.get(key); if (prev == null) { System.out.format("register: %s\n", dir); } else { if (!dir.equals(prev)) { System.out.format("update: %s -> %s\n", prev, dir); } } } keys.put(key, dir); } /** * Register the given directory, and all its sub-directories, with the * WatchService. */ private void registerAll(final Path start) throws IOException { // register directory and sub-directories Files.walkFileTree(start, new SimpleFileVisitor() { @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { register(dir); return FileVisitResult.CONTINUE; } }); } /** * Creates a WatchService and registers the given directory */ private WatchDir(Path dir, boolean recursive) throws IOException { fileProcessingQueue = new ArrayBlockingQueue(WatchDirParameters.fileQueueSize, false); processedFileQueue = new ArrayBlockingQueue(WatchDirParameters.fileQueueSize, false); this.watcher = FileSystems.getDefault().newWatchService(); this.keys = new HashMap(); this.recursive = recursive; //CreateTxtFile.createFile(dir, 1); if (recursive) { System.out.format("Scanning %s ...\n", dir); registerAll(dir); System.out.println("Done."); } else { register(dir); } // enable trace after initial registration this.trace = true; } } 

参数类:

 package dirwatch; public class WatchDirParameters { public static final int millisToPuaseForFileLock = 200; public static final int fileQueueSize = 500; public static final int millisToSwapFileForUnlocking = 2000; } 

制作了由@UDPLover提供的文件的更新版本,该文件是为在高速文件访问环境中使用而构建的,我将进程队列转换为HashMap以携带监视事件以传递给抽象方法在文件内部阻止检查器本身。 还制作了一个print()方法,允许启用或禁用WatchCore打印到控制台的任何内容。 更新了原始示例中的循环文件轮询以使用JDK8函数进行循环,使所有部分都进行了线程化/中断。 这是尚未经过测试的,当我可以测试它时会更新修复程序。

 package filewatcher; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.nio.file.ClosedWatchServiceException; import java.nio.file.FileSystems; import java.nio.file.FileVisitResult; import java.nio.file.Files; import static java.nio.file.LinkOption.NOFOLLOW_LINKS; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; import static java.nio.file.StandardWatchEventKinds.OVERFLOW; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; import java.nio.file.attribute.BasicFileAttributes; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.logging.Level; import java.util.logging.Logger; /** * * @author Nackloose * http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java */ public abstract class WatchCore extends Thread { //make class a thread by default /** * After the WatchCore recieves an event for a file and deems it unlocked, * it will be passed to this function * * @param e WatchEvent for the file, after it has been affirmed to be * unlocked. */ public abstract void onEventAndUnlocked(WatchEvent e); private final WatchService watcher; private final Map keys; private final boolean recursive; private boolean trace = false; //converted to HashMap to remove the limitation as I need this in a high rate of file access enviroment. //as well as to carry the event passed for that folder into the block check itself. //got rid of the finished queue and made events pass to the abstract void above private final HashMap fileProcessingQueue; //create a varible to keep track of the thread checking the file blocking, so we can start and stop it. private final WatchBlocker blocker; public WatchCore(String dir) throws IOException { //defaultly dont recurse this(dir, false); } public WatchCore(String dir, boolean recursive) throws IOException { this(Paths.get(dir), recursive); } public WatchCore(Path dir) throws IOException { //defaultly dont recurse this(dir, false); } public WatchCore(Path dir, boolean recursive) throws IOException { fileProcessingQueue = new HashMap<>(); this.watcher = FileSystems.getDefault().newWatchService(); this.keys = new HashMap<>(); this.recursive = recursive; //CreateTxtFile.createFile(dir, 1); if (recursive) { print("Scanning %s ...", dir); registerAll(dir); print("Done."); } else { register(dir); } // enable trace after initial registration this.trace = true; //start the thread to process files to be checked for file blocking blocker = new WatchBlocker(); } @SuppressWarnings("unchecked") private static  WatchEvent cast(WatchEvent event) { return (WatchEvent) event; } @Override public synchronized void start() { //start up our thread _FIRST_ super.start(); //then start the blocking thread blocker.start(); } @Override public void interrupt() { //Everything backwards, stop the blocker _FIRST_ blocker.interrupt(); //then stop our thread. super.interrupt(); } /** * Register the given directory with the WatchService */ private void register(Path dir) throws IOException { //WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); WatchKey key = dir.register(watcher, ENTRY_CREATE); if (trace) { Path prev = keys.get(key); if (prev == null) { print("register: %s\n", dir); } else { if (!dir.equals(prev)) { print("update: %s -> %s\n", prev, dir); } } } keys.put(key, dir); } /** * Register the given directory, and all its sub-directories, with the * WatchService. */ private void registerAll(final Path start) throws IOException { // register directory and sub-directories Files.walkFileTree(start, new SimpleFileVisitor() { @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { register(dir); return FileVisitResult.CONTINUE; } }); } /** * Process all events for keys queued to the watcher */ @Override public void run() { //this was previous called processEvents() //pruned any un-nessicary continues, labels, and labels on breaks, a lot of them //were redundant print("DirWatcherThread started."); //as long as we're not interrupted we keep working while (!interrupted()) { // wait for key to be signalled WatchKey key; try { key = watcher.take(); } catch (InterruptedException x) { // if we are returning from these method, it means we no longer wants to watch directory // we must close thread which may be waiting for file names in queue continue; } catch (ClosedWatchServiceException cwse) { break; } Path dir = keys.get(key); if (dir == null) { printe("WatchKey not recognized!!"); continue; } try { //converted to functional for loop. key.pollEvents().stream().filter((event) -> { WatchEvent.Kind kind = event.kind(); return !(kind == OVERFLOW); //make sure we do the filter }).forEach((event) -> { WatchEvent.Kind kind = event.kind(); // Context for directory entry event is the file name of entry WatchEvent ev = cast(event); Path name = ev.context(); Path child = dir.resolve(name); if (kind.equals(ENTRY_CREATE)) { // if directory is created, and watching recursively, then // register it and its sub-directories if (recursive) { try { if (Files.isDirectory(child, NOFOLLOW_LINKS)) { registerAll(child); return; //continue; } } catch (IOException x) { // ignore to keep sample readbale } } fileProcessingQueue.put(child.toString(), ev); } }); // reset key and remove from set if directory no longer accessible boolean valid = key.reset(); if (!valid) { keys.remove(key); // all directories are inaccessible if (keys.isEmpty()) { break; } } } catch (ClosedWatchServiceException cwse) { break; } } print("DirWatcherThread exited."); } /** * * @author * http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java * Nackloose */ private class WatchBlocker extends Thread { @Override public synchronized void start() { //get it going super.start(); } @Override public void interrupt() { //interupt our thread super.interrupt(); } @Override public void run() { //this was perviously processFiles() //pruned any un-nessicary continues, labels, and labels on breaks, a lot of them //were redundant print("DirWatchProcessingThread Started"); Entry fileEvent; outerLoop: //as long as we're not interrupted we keep working while (!interrupted()) { if (fileProcessingQueue.isEmpty()) { try { Thread.sleep(WatchCoreParameters.timeToIdle); } catch (InterruptedException ex) { Logger.getLogger(WatchCore.class.getName()).log(Level.SEVERE, null, ex); } continue; } fileEvent = fileProcessingQueue.entrySet().iterator().next(); fileProcessingQueue.remove(fileEvent.getKey()); long startTime = System.currentTimeMillis(); while (true) { FileInputStream fis = null; File file = new File(fileEvent.getKey()); try { fis = new FileInputStream(fileEvent.getKey()); break; } catch (FileNotFoundException fnfe) { if (!file.exists() || file.isDirectory()) { print("File: '" + fileEvent + "has been deleted in file system or it is not file. Not processing this file."); continue outerLoop; } try { Thread.sleep(WatchCoreParameters.millisToPauseForFileLock); } catch (InterruptedException ie) { } if ((System.currentTimeMillis() - startTime) > WatchCoreParameters.millisToSwapFileForUnlocking) { fileProcessingQueue.put(fileEvent.getKey(), fileEvent.getValue()); } } finally { if (fis != null) { try { fis.close(); } catch (IOException ioe) { ioe.printStackTrace(); } } } } print("Queuing File: " + fileEvent); //pass the unlocked file event to the abstract method onEventAndUnlocked(fileEvent.getValue()); } print("DirWatchProcessingThread Exited"); } } /** * * @author * http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java * Nackloose */ public static class WatchCoreParameters { public static int timeToIdle = 2000, millisToPauseForFileLock = 200, millisToSwapFileForUnlocking = 2000; public static boolean verbose = false; } // private void print(String s) { //defaultly we're not writing an error print(s, false); } public static final void print(String s, boolean error) { //check verbosity, exit if none. if (!WatchCoreParameters.verbose) { return; } //if this is an error, assign System.err to a temp varible //otherise assign System.out for normal printing PrintStream out = (!error ? System.out : System.err); if (s.contains("\n")) { // check to see if theirs a new line out.print(s); //print accordingly } else { out.println(s); //print accordingly } } public static final void printe(String s) { //shortcut/convenience method for printing an error print(s, true); } public static final void print(String s, Object... formatObj) { //check verbosity, exit if none. if (!WatchCoreParameters.verbose) { return; } //format the object into the string, and if no newline is there, add it. System.out.format(s + (s.contains("\n") ? "" : "\n"), formatObj); } // }