
我一直在编写一个监视目录的程序,当在其中创建文件时,它会更改名称并将它们移动到新目录。 在我的第一个实现中,我使用了Java的Watch Service API,当我测试1kb文件时,它运行良好。 出现的问题是,实际上创建的文件大小在50-300mb之间。 当发生这种情况时,观察者API会立即找到该文件,但由于它仍在被写入,因此无法移动它。 我尝试将观察者放在一个循环中(生成exception直到文件可以移动)但这看起来效率很低。

由于这不起作用,我尝试使用定时器,每隔10秒检查一次文件夹,然后尽可能移动文件。 这是我最终选择的方法。

问题:无论如何在没有进行exception检查或不断比较大小的情况下发出文件写入信号? 我喜欢为每个文件使用Watcher API一次,而不是使用计时器不断检查(并运行exception)。



写另一个文件作为原始文件完成的指示。 Ig’fileorg.dat’正在增长,如果完成创建文件’fileorg.done’并仅检查’fileorg.done’。


我今天遇到了同样的问题。 我在实际导入文件之前使用了一小段延迟并不是一个大问题,我仍然想使用NIO2 API。 我选择的解决方案是等待文件在10秒内没有被修改,然后对其执行任何操作。

实施的重要部分如下。 程序等待,直到等待时间到期或发生新事件。 每次修改文件时都会重置到期时间。 如果在等待时间到期之前删除文件,则会从列表中删除该文件。 我将poll方法与预期到期时间的超时一起使用,即(lastmodified + waitTime)-currentTime

 private final Map expirationTimes = newHashMap(); private Long newFileWait = 10000L; public void run() { for(;;) { //Retrieves and removes next watch key, waiting if none are present. WatchKey k = watchService.take(); for(;;) { long currentTime = new DateTime().getMillis(); if(k!=null) handleWatchEvents(k); handleExpiredWaitTimes(currentTime); // If there are no files left stop polling and block on .take() if(expirationTimes.isEmpty()) break; long minExpiration = min(expirationTimes.values()); long timeout = minExpiration-currentTime; logger.debug("timeout: "+timeout); k = watchService.poll(timeout, TimeUnit.MILLISECONDS); } } } private void handleExpiredWaitTimes(Long currentTime) { // Start import for files for which the expirationtime has passed for(Entry entry : expirationTimes.entrySet()) { if(entry.getValue()<=currentTime) { logger.debug("expired "+entry); // do something with the file expirationTimes.remove(entry.getKey()); } } } private void handleWatchEvents(WatchKey k) { List> events = k.pollEvents(); for (WatchEvent event : events) { handleWatchEvent(event, keys.get(k)); } // reset watch key to allow the key to be reported again by the watch service k.reset(); } private void handleWatchEvent(WatchEvent event, Path dir) throws IOException { Kind kind = event.kind(); WatchEvent ev = cast(event); Path name = ev.context(); Path child = dir.resolve(name); if (kind == ENTRY_MODIFY || kind == ENTRY_CREATE) { // Update modified time FileTime lastModified = Attributes.readBasicFileAttributes(child, NOFOLLOW_LINKS).lastModifiedTime(); expirationTimes.put(name, lastModified.toMillis()+newFileWait); } if (kind == ENTRY_DELETE) { expirationTimes.remove(child); } } 



对不完整的文件使用唯一的前缀。 像而不是 。 上传/创建完成后重命名文件。 从手表中排除.inc文件。

第二种是在同一驱动器上使用不同的文件夹来创建/上传/写入文件,并在准备好后将它们移动到监视文件夹。 如果它们位于同一个驱动器上,那么移动应该是一个primefaces操作(我想是依赖于文件系统)。




 if (kind == ENTRY_CREATE) { System.out.println("Creating file: " + child); boolean isGrowing = false; Long initialWeight = new Long(0); Long finalWeight = new Long(0); do { initialWeight = child.toFile().length(); Thread.sleep(1000); finalWeight = child.toFile().length(); isGrowing = initialWeight < finalWeight; } while(isGrowing); System.out.println("Finished creating file!"); } 

在创建文件时,它将变得越来越大。 所以我所做的是比较一秒钟分开的重量。 应用程序将处于循环中,直到两个权重相同。

虽然当SO完成复制时,Watcher Service API无法通知,但所有选项似乎都是“解决”(包括这个!)。




3)等待超时或新事件发生将是一个选项,但是如果系统过载但副本没有完成怎么办? 如果超时值很大,程序会等待很长时间。



 boolean locked = true; while (locked) { RandomAccessFile raf = null; try { raf = new RandomAccessFile(file, "r"); // it will throw FileNotFoundException. It's not needed to use 'rw' because if the file is delete while copying, 'w' option will create an empty file.; // just to make sure everything was copied, goes to the last byte locked = false; } catch (IOException e) { locked = file.exists(); if (locked) { System.out.println("File locked: '" + file.getAbsolutePath() + "'"); Thread.sleep(1000); // waits some time } else { System.out.println("File was deleted while copying: '" + file.getAbsolutePath() + "'"); } } finally { if (raf!=null) { raf.close(); } } } 

看起来Apache Camel通过尝试重命名文件(来处理文件未完成上传的问题。 如果重命名失败,则没有读锁定,但继续尝试。 重命名成功后,将其重命名,然后继续进行预期处理。

请参阅下面的operations.renameFile 。 以下是Apache Camel源代码的链接: GenericFileRenameExclusiveReadLockStrategy.java和

 public boolean acquireExclusiveReadLock( ... ) throws Exception { LOG.trace("Waiting for exclusive read lock to file: {}", file); // the trick is to try to rename the file, if we can rename then we have exclusive read // since its a Generic file we cannot use java.nio to get a RW lock String newName = file.getFileName() + ".camelExclusiveReadLock"; // make a copy as result and change its file name GenericFile newFile = file.copyFrom(file); newFile.changeFileName(newName); StopWatch watch = new StopWatch(); boolean exclusive = false; while (!exclusive) { // timeout check if (timeout > 0) { long delta = watch.taken(); if (delta > timeout) { CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file); // we could not get the lock within the timeout period, so return false return false; } } exclusive = operations.renameFile(file.getAbsoluteFilePath(), newFile.getAbsoluteFilePath()); if (exclusive) { LOG.trace("Acquired exclusive read lock to file: {}", file); // rename it back so we can read it operations.renameFile(newFile.getAbsoluteFilePath(), file.getAbsoluteFilePath()); } else { boolean interrupted = sleep(); if (interrupted) { // we were interrupted while sleeping, we are likely being shutdown so return false return false; } } } return true; } 

这是一个非常有趣的讨论,因为这肯定是一个面包和黄油用例:等待创建一个新文件,然后以某种方式对文件做出反应。 这里的竞争条件很有意思,因为这里的高级要求当然是获取一个事件,然后实际获得(至少)文件的读锁定。 对于大型文件或仅仅是大量的文件创建,这可能需要整个工作线程池,它们只是定期尝试获取新创建的文件的锁定,并且当它们成功时,实际上完成工作。 但是,正如我确信NT意识到的那样,人们必须仔细地做到这一点,以使其扩展,因为它最终是一种轮询方法,并且可扩展性和轮询不是两个完美结合的词。

当我实现文件系统观察器来传输上传的文件时,我不得不处理类似的情况。 我为解决这个问题而实施的解决方案包括以下内容:


2-在你的fileProcessor中,你从列表中拾取一个文件并检查它是否被文件系统锁定,如果是,你将得到一个exception,只是捕获这个exception并让你的线程处于等待状态(即10秒),然后重试再次锁定被释放。 处理完文件后,您可以将标志更改为true或将其从地图中删除。



根据您在写入文件后需要移动文件的紧急程度,您还可以检查稳定的上次修改时间戳,并仅移动文件静止。 你需要它保持稳定的时间可能是依赖于实现的,但我认为具有最后修改时间戳的东西在15秒内没有改变应该足够稳定以便移动。

对于linux中的大文件,文件将以.filepart的扩展名进行复制。 您只需要使用commons api检查扩展并注册ENTRY_CREATE事件。 我用我的.csv文件(1GB)测试了这个并添加它工作

 public void run() { try { WatchKey key = myWatcher.take(); while (key != null) { for (WatchEvent event : key.pollEvents()) { if (FilenameUtils.isExtension(event.context().toString(), "filepart")) { System.out.println("Inside the PartFile " + event.context().toString()); } else { System.out.println("Full file Copied " + event.context().toString()); //Do what ever you want to do with this files. } } key.reset(); key = myWatcher.take(); } } catch (InterruptedException e) { e.printStackTrace(); } } 


在我的例子中,文件是通过WebDav(Apache)创建的,并且创建了许多临时文件,但是同一文件也触发了两个 ENTRY_CREATED事件。 第二个ENTRY_CREATED事件表示复制过程已完成。

这是我的示例ENTRY_CREATED事件。 打印绝对文件路径(您的日志可能会有所不同,具体取决于写入文件的应用程序):

 [info] application - /var/www/webdav/.davfs.tmp39dee1 was created [info] application - /var/www/webdav/document.docx was created [info] application - /var/www/webdav/.davfs.tmp054fe9 was created [info] application - /var/www/webdav/document.docx was created [info] application - /var/www/webdav/.DAV/__db.document.docx was created 

如您所见,我为document.docx收到两个ENTRY_CREATED事件。 在第二个事件之后,我知道文件已完成。 在我的情况下,临时文件显然被忽略了。

所以,我有同样的问题,并有以下解决方案为我工作。 早期尝试不成功 – 尝试监视每个文件的“lastModifiedTime”统计信息,但我注意到大文件的大小增长可能暂停一段时间。(大小不会连续变化)

基本思路 – 对于每个事件,创建一个触发器文件(在临时目录中),其名称具有以下格式 –


此文件为空,所有播放仅在名称中。 仅在传递特定持续时间的间隔后才会考虑原始文件,而不更改其“上次修改时间”统计信息。 (注意 – 因为它是文件统计信息,所以没有开销 – > O(1))

– 该触发器文件由不同的服务处理(比如’ FileTrigger ‘)。


  1. 没有睡觉或等待保持系统。
  2. 释放文件监视器以监视其他事件

FileWatcher的代码 –

 val triggerFileName: String = triggerFileTempDir + orifinalFileName + "_" + Files.getLastModifiedTime(Paths.get(event.getFile.getName.getPath)).toMillis + "_0" // creates trigger file in temporary directory val triggerFile: File = new File(triggerFileName) val isCreated: Boolean = triggerFile.createNewFile() if (isCreated) println("Trigger created: " + triggerFileName) else println("Error in creating trigger file: " + triggerFileName) 


  val actualPath : String = "Original file directory here" val tempPath : String = "Trigger file directory here" val folder : File = new File(tempPath) val listOfFiles = folder.listFiles() for (i <- listOfFiles) { // ActualFileName_LastModifiedTime_NumberOfTries val triggerFileName: String = i.getName val triggerFilePath: String = i.toString // extracting file info from trigger file name val fileInfo: Array[String] = triggerFileName.split("_", 3) // 0 -> Original file name, 1 -> last modified time, 2 -> number of tries val actualFileName: String = fileInfo(0) val actualFilePath: String = actualPath + actualFileName val modifiedTime: Long = fileInfo(1).toLong val numberOfTries: Int = fileStats(2).toInt val currentModifiedTime: Long = Files.getLastModifiedTime(Paths.get(actualFilePath)).toMillis val differenceInModifiedTimes: Long = currentModifiedTime - modifiedTime // checks if file has been copied completely(4 intervals of 5 mins each with no modification) if (differenceInModifiedTimes == 0 && numberOfTries == 3) { FileUtils.deleteQuietly(new File(triggerFilePath)) println("Trigger file deleted. Original file completed : " + actualFilePath) } else { var newTriggerFileName: String = null if (differenceInModifiedTimes == 0) { // updates numberOfTries by 1 newTriggerFileName = actualFileName + "_" + modifiedTime + "_" + (numberOfTries + 1) } else { // updates modified timestamp and resets numberOfTries to 0 newTriggerFileName = actualFileName + "_" + currentModifiedTime + "_" + 0 } // renames trigger file new File(triggerFilePath).renameTo(new File(tempPath + newTriggerFileName)) println("Trigger file renamed: " + triggerFileName + " -> " + newTriggerFileName) } } 
