如何找到哪个Java / Scala线程锁定了文件?
简单来说:
- 如何找到哪个Java / Scala线程锁定了文件? 我知道JVM中的一个类/线程已经锁定了一个具体文件(重叠了一个文件区域),但我不知道如何。 当我在断点中停止应用程序时,有可能找出正在执行此操作的类/线程吗?
以下代码抛出OverlappingFileLockException :
FileChannel.open(Paths.get("thisfile"), StandardOpenOption.APPEND).tryLock().isValid(); FileChannel.open(Paths.get("thisfile"), StandardOpenOption.APPEND).tryLock()..isShared();
- Java / Scala如何锁定此文件( Spark )? 我知道如何使用java.nio.channels锁定文件,但我没有在Spark的github存储库中找到适当的调用。
关于我的问题的更多信息: 1。当我在Windows操作系统中使用Hive运行Spark时,它可以正常工作,但是每次Spark关闭时,它都无法删除一个临时目录(在此之前的其他临时目录被正确删除)并输出以下exception:
2015-12-11 15:04:36 [Thread-13] INFO org.apache.spark.SparkContext - Successfully stopped SparkContext 2015-12-11 15:04:36 [Thread-13] INFO oaspark.util.ShutdownHookManager - Shutdown hook called 2015-12-11 15:04:36 [Thread-13] INFO oaspark.util.ShutdownHookManager - Deleting directory C:\Users\MyUser\AppData\Local\Temp\spark-9d564520-5370-4834-9946-ac5af3954032 2015-12-11 15:04:36 [Thread-13] INFO oaspark.util.ShutdownHookManager - Deleting directory C:\Users\MyUser\AppData\Local\Temp\spark-42b70530-30d2-41dc-aff5-8d01aba38041 2015-12-11 15:04:36 [Thread-13] ERROR oaspark.util.ShutdownHookManager - Exception while deleting Spark temp dir: C:\Users\MyUser\AppData\Local\Temp\spark-42b70530-30d2-41dc-aff5-8d01aba38041 java.io.IOException: Failed to delete: C:\Users\MyUser\AppData\Local\Temp\spark-42b70530-30d2-41dc-aff5-8d01aba38041 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:884) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:63) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:60) [spark-core_2.11-1.5.0.jar:1.5.0] at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) [scala-library-2.11.6.jar:na] at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:60) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0] at scala.util.Try$.apply(Try.scala:191) [scala-library-2.11.6.jar:na] at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216) [spark-core_2.11-1.5.0.jar:1.5.0] at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) [hadoop-common-2.4.1.jar:na]
我尝试在互联网上进行搜索,但发现Spark 正在进行中的问题(一个用户尝试做一些补丁,但是如果我正确地对这个拉取请求进行注释,它就无法正常工作)以及SO中的一些未解答的问题。
看起来问题出在Utils.scala类的deleteRecursively()方法中。 我将断点设置为此方法并将其重写为Java:
public class Test { public static void deleteRecursively(File file) { if (file != null) { try { if (file.isDirectory()) { for (File child : listFilesSafely(file)) { deleteRecursively(child); } //ShutdownHookManager.removeShutdownDeleteDir(file) } } finally { if (!file.delete()) { if (file.exists()) { throw new RuntimeException("Failed to delete: " + file.getAbsolutePath()); } } } } } private static List listFilesSafely(File file) { if (file.exists()) { File[] files = file.listFiles(); if (files == null) { throw new RuntimeException("Failed to list files for dir: " + file); } return Arrays.asList(files); } else { return Collections.emptyList(); } } public static void main(String [] arg) { deleteRecursively(new File("C:\\Users\\MyUser\\AppData\\Local\\Temp\\spark-9ba0bb0c-1e20-455d-bc1f-86c696661ba3")); }
当Spark在此方法的断点处停止时,我发现Spark的一个线程的JVM锁定了“C:\ Users \ MyUser \ AppData \ Local \ Temp \ spark-9ba0bb0c-1e20-455d-bc1f-86c696661ba3 \ metastore \ db .lck“文件和Windows Process Explorer也显示Java锁定此文件。 FileChannel也显示该文件在JVM中被锁定。
现在,我必须:
-
找出哪个线程/类已锁定此文件
-
找出锁定文件的方法Spark用于锁定“metastore \ db.lck”,它是什么类以及如何在关机前解锁它
-
在调用deleteRecursively()方法之前对Spark或Hive执行拉取请求以解锁此文件(“metastore \ db.lck”)或至少留下关于问题的注释
如果您需要任何其他信息,请在评论中提问。
- 如何找到哪个Java / Scala线程锁定了文件?
我有一些问题,并找出这个解决方案:至少在Thread.threadLocals字段中你可以看到所有锁定的对象。
如果文件锁定以下代码:
File newFile = new File("newFile.lock"); newFile.createNewFile(); FileLock fileLock = FileChannel.open(Paths.get(newFile.getAbsolutePath()), StandardOpenOption.APPEND).tryLock();
在Thread.threadLocals
您可以看到带有字段owner
=“… / newFile.lock”的sun.nio.fs.NativeBuffer
类。
所以你可以尝试下面的代码,它返回所有线程与threadLocals中的所有类,你需要找到什么Threads有NativeBuffer类或Spark / Hive对象等等(并在Eclipse或IDEA调试模式下检查这个线程的threadLocals):
private static String getThreadsLockFile() { Set threads = Thread.getAllStackTraces().keySet(); StringBuilder builder = new StringBuilder(); for (Thread thread : threads) { builder.append(getThreadsLockFile(thread)); } return builder.toString(); } private static String getThreadsLockFile(Thread thread) { StringBuffer stringBuffer = new StringBuffer(); try { Field field = thread.getClass().getDeclaredField("threadLocals"); field.setAccessible(true); Object map = field.get(thread); Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table"); table.setAccessible(true); Object tbl = table.get(map); int length = Array.getLength(tbl); for (int i = 0; i < length; i++) { try { Object entry = Array.get(tbl, i); if (entry != null) { Field valueField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getDeclaredField("value"); valueField.setAccessible(true); Object value = valueField.get(entry); if (value != null) { stringBuffer.append(thread.getName()).append(" : ").append(value.getClass()). append(" ").append(value).append("\n"); } } } catch (Exception exp) { // skip, do nothing } } } catch (Exception exp) { // skip, do nothing } return stringBuffer.toString(); }
或者您可以尝试使用以下代码,但此代码仅NativeBuffer
带有owner
参数的NativeBuffer
类(因此它并不适用于所有情况):
private static String getThreadsLockFile(String fileName) { Set threads = Thread.getAllStackTraces().keySet(); StringBuilder builder = new StringBuilder(); for (Thread thread : threads) { builder.append(getThreadsLockFile(thread, fileName)); } return builder.toString(); } private static String getThreadsLockFile(Thread thread, String fileName) { StringBuffer stringBuffer = new StringBuffer(); try { Field field = thread.getClass().getDeclaredField("threadLocals"); field.setAccessible(true); Object map = field.get(thread); Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table"); table.setAccessible(true); Object tbl = table.get(map); int length = Array.getLength(tbl); for (int i = 0; i < length; i++) { try { Object entry = Array.get(tbl, i); if (entry != null) { Field valueField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getDeclaredField("value"); valueField.setAccessible(true); Object value = valueField.get(entry); if (value != null) { int length1 = Array.getLength(value); for (int j = 0; j < length1; j++) { try { Object entry1 = Array.get(value, j); Field ownerField = Class.forName("sun.nio.fs.NativeBuffer").getDeclaredField("owner"); ownerField.setAccessible(true); String owner = ownerField.get(entry1).toString(); if (owner.contains(fileName)) { stringBuffer.append(thread.getName()); } } catch (Exception exp) { // skip, do nothing } } } } } catch (Exception exp) { // skip, do nothing } } } catch (Exception exp) { // skip, do nothing } return stringBuffer.toString(); }
请参阅如何找出哪个线程在java中锁定文件?
Windows进程锁定了文件。 线程可以打开文件来读写,但是一个包含对文件句柄的引用的类负责关闭它。 因此,您应该寻找一个对象,而不是一个线程。
请参阅如何确定未固定对象的内容? 找出方法。
我告诉你我发现的关于我自己的猜测没有其他答案的信息(非常感谢Basilevs , tploter ),可能在同样的情况下帮助某人:
-
每当JVM线程独占锁定文件时,JVM也会锁定一些Jave对象 ,例如,我发现在我的情况下:
- sun.nio.fs.NativeBuffer
- sun.nio.ch.Util $ BufferCache
因此,您只需要找到这个锁定的Java对象并对其进行分析,然后找到哪个线程锁定了您的文件 。
我不确定文件是否只是打开(没有锁定),但我确信如果文件被Thread专门锁定(使用java.nio.channels.FileLock , java.nio.channels.FileChannel等)上)
- 不幸的是,关于Spark,我发现了很多其他锁定的
Hive
对象(org.apache.hadoop.hive.ql.metadata.Hive
,org.apache.hadoop.hive.metastore.ObjectStore
,org.apache.hadoop.hive.ql.session.SessionState
,org.apache.hadoop.hive.ql.metadata.Hive
等)当Spark
尝试删除db.lck时,这意味着Spark
没有正确关闭Hive
,在它尝试之前删除Hive's
文件。 幸运的是,Linux OS
没有此问题(可能是Linux
允许删除锁定文件)。
- 带有DataFrame API的Apache Spark MLlib在createDataFrame()或read()时会产生java.net.URISyntaxException .csv(…)
- Spark DataFrame并重命名多个列(Java)
- 使用Apache Spark和Java将CSV解析为DataFrame / DataSet
- 线程主java.lang.exceptionininitializerError中的exception当没有hadoop安装spark时
- 使用Apache Spark从Amazon S3解析文件
- 运行apache spark job时,任务不可序列化exception
- 如何在Java / Kotlin中创建一个返回复杂类型的Spark UDF?
- 如何更新火花流中的广播变量?
- 如何在Spark RDD(Java)中通过索引获取元素