如何正确处理两个线程更新数据库中的同一行

我有一个名为T1的线程用于读取平面文件并解析它。 我需要创建一个名为T2的新线程来解析这个文件的某些部分,稍后这个T2线程需要更新原始实体的状态,原始线程T1也会对其进行解析和更新。我该如何处理这个问题情况?

我收到一个包含以下样本记录的平面文件:

 AAAA BBBB AACC BBCC AADD BBDD 

首先,此文件以“已Received状态保存在数据库中。 现在,所有以BBAA开头的记录都需要在一个单独的线程中处理。 一旦成功解析,两个线程都会尝试将数据库中此文件对象的状态更新为Parsed 。 在某些情况下,我得到staleObjectException编辑:在exception丢失之前,任何线程完成的工作。 我们正在使用乐观锁定。 避免这个问题的最佳方法是什么?

两个线程更新同一个对象时可能出现的hibernateexception?

上面的post有助于理解它的某些部分,但它无助于解决我的问题。

第1部分 – 你的问题 – 我看到它的方式。

您收到此exception的主要原因是您正在使用可能乐观锁定的 Hibernate。 这基本上告诉你线程T1或线程T2已经将状态更新为PARSED,现在另一个线程持有旧版本的行,其版本比数据库中保存的版本小,并尝试将状态更新为PARSED 。

这里最大的问题是“ 两个线程是否试图保留相同的数据 ?”。 如果这个问题的答案是肯定的,那么即使最后一次更新成功,也应该没有任何问题,因为最终他们将行更新为相同的状态。 在这种情况下,您实际上不需要乐观锁定,因为您的数据在任何情况下都是同步的。

如果在重置到下一个状态时两个线程T1和T2实际上彼此依赖,则在状态设置为RECIEVED之后出现主要问题。 在这种情况下,您需要确保如果T1已首先执行(反之亦然),T2需要刷新更新行的数据并根据T1已推送的更改重新应用其更改。 在这种情况下,解决方案如下。 如果遇到staleObjectException,则基本上需要从数据库刷新数据并重新启动操作。

第2部分关于链接的分析 当两个线程更新同一个对象时,可能的hibernateexception? 方法1 ,这或多或少是最后更新Wins的情况。 它或多或少地避免了乐观锁定(版本计数)。 如果您没有从T1到T2的依赖关系或反向以设置状态PARSED。 这应该是好的。

**** Aproach 2 **乐观锁定**这就是你现在拥有的。 解决方案是刷新数据并重新启动操作。

Aproach 3行级DB锁定这里的解决方案与方法2的解决方案大致相同,只有悲观锁定的小修正。 主要区别在于,在这种情况下,它可能是一个READ锁,如果它是PESSIMISTIC READ,您可能甚至无法从数据库中读取数据以刷新它。

Aproach 4应用程序级同步有许多不同的方法可以进行同步。 一个示例是将所有更新实际安排在BlockingQueue或JMS队列中(如果您希望它是持久的)并从单个线程推送所有更新。 为了使它可视化,T1和T2将把元素放在队列上,并且将有一个T3线程读取操作并将它们推送到数据库服务器。

如果使用应用程序级同步,则应注意不能在多服务器部署中分发所有结构。

好吧,我现在想不出别的:)

我不确定我理解这个问题,但似乎它会构成一个线程T1的逻辑错误,它只处理,例如,以AA开头的记录将整个文件标记为“Parsed”? 例如,如果您的应用程序在T1更新后崩溃但T2仍在处理BB记录时会发生什么? 有些BB记录可能会丢失,对吗?

无论如何,问题的关键在于你有一个竞争条件,两个线程更新同一个对象。 陈旧的对象exception只是意味着你的一个线程失去了竞争。 更好的解决方案完全避免了比赛。

(我在这里假设个别记录处理是幂等的,如果不是这种情况我认为你有更大的问题,因为一些失败模式将导致记录的重新处理。如果记录处理必须发生一次且只发生一次,那么你有一个更难的问题,消息队列可能是一个更好的解决方案。)

我将利用java.util.concurrent的function将记录分发给线程工作者,并让线程与hibernate块交互,直到所有记录都被处理完为止,此时该线程可以将文件标记为“Parsed”。

例如,

 // do something like this during initialization, or use a Guava LoadingCache... Map executors = new HashMap<>(); // note I'm assuming RecordType looks like an enum executors.put(RecordType.AA_RECORD, Executors.newSingleThreadExecutor()); 

然后在处理文件时,按如下方式调度每条记录,构建与排队任务状态对应的期货列表。 让我们假设成功处理一条记录返回一个布尔值“true”:

 List> tasks = new ArrayList<>(); for (Record record: file.getRecords()) { Executor executorForRecord = executors.get(record.getRecordType()); tasks.add(executor.submit(new RecordProcessor(record))); } 

现在等待所有任务成功完成 – 有更优雅的方法来做到这一点,特别是与番石榴。 请注意,如果您的任务因exception而失败,您还需要处理ExecutionException,我在此处对此进行了修改。

 boolean allSuccess = true; for (Future task: tasks) { allSuccess = allSuccess && task.get(); if (!allSuccess) break; } // if all your tasks completed successfully, update the file record if (allSuccess) { file.setStatus("Parsed"); } 

假设每个线程T1,T2将解析文件的不同部分,意味着没有人覆盖其他线程解析。 最好的办法是解析过程与数据库提交分离。

T1,T2将执行解析T3或主线程将在T1,T2完成后执行提交。 我认为在这种方法中,只有当两个线程都完成时才更正确地将文件状态更改为Parsed

您可以将T3视为CommitService类,等待T1,T2 finsih然后提交到DB

CountDownLatch是一个有用的工具。 这是一个例子