通知postgres对java应用程序的更改

问题

我正在为几十万种产品建立一个postgres数据库 。 我将设置一个索引(Solr或ElasticSearch)来改善复杂搜索查询的查询时间。

现在的重点是如何让索引与数据库同步?

在过去, 我有一种应用程序定期轮询数据库以检查应该完成的更新 ,但我会有一个过时的索引状态时间 (从数据库更新到索引更新拉动)。

我更喜欢一种解决方案,其中数据库将通知我的应用程序 (Java应用程序)数据库中的某些内容已被更改,此时应用程序将决定是否需要更新索引。 为了更准确,我将构建一种生产者和消费者结构,希望副本将从postgres接收有关更改的通知,如果这与索引的数据相关,则将其存储在待执行更新的堆栈中。 消费者将使用此堆栈并构建要存储到索引中的文档。

可能的解决方案

一种解决方案是编写一种副本端点 ,其中应用程序将表现为用于复制原始数据库中的数据的postgres实例。 有人对这种方法有一些经验吗?

我对此问题有哪些其他解决方案?

我对此问题有哪些其他解决方案?

使用LISTENNOTIFY告诉您的应用程序已发生变化。

您可以从也记录队列表中的更改的触发器发送NOTIFY

你需要一个PgJDBC连接,它已经为你正在使用的事件发送了一个LISTEN 。 如果您使用SSL,它必须通过定期发送空查询( "" )来轮询数据库; 如果您不使用SSL,则可以通过使用异步通知检查来避免这种情况。 您需要从连接池中解包Connection对象,以便能够将基础连接转换为PgConnection以使用listen / notify。 见相关答案

生产者/消费者的位将更难。 要在PostgreSQL中拥有多个崩溃安全的并发使用者,您需要使用pg_try_advisory_lock(...)建议锁定。 如果你不需要并发消费者那么这很容易,你只需要SELECT ... LIMIT 1 FOR UPDATE一行。

希望9.4将包含一个更简单的方法,用FOR UPDATE跳过锁定的行,因为它正在开发中。

要使用postgres的LISTEN和NOTIFY,您需要使用可支持异步通知的驱动程序。 postgres JDBC驱动程序不支持异步通知。

要不断地从Application Server通过一个通道,请转到pgjdbc-ng 0.6驱动程序。

http://impossibl.github.io/pgjdbc-ng/

它支持异步通知,无需轮询。

一般来说,我建议使用EAI模式实现松散耦合。 然后,如果您决定交换数据库,则索引端的代码不会更改。

如果您想坚持使用紧耦合,我建议使用LISTEN / NOTIFY 。 在Java中,使用pgjdbc-ng驱动程序很重要,因为它支持没有轮询的异步通知。

这是一个异步模式(基于这个答案 ):

 import com.impossibl.postgres.api.jdbc.PGConnection; import com.impossibl.postgres.api.jdbc.PGNotificationListener; import com.impossibl.postgres.jdbc.PGDataSource; import java.sql.Statement; public static void listenToNotifyMessage() { PGDataSource dataSource = new PGDataSource(); dataSource.setHost("localhost"); dataSource.setPort(5432); dataSource.setDatabase("database_name"); dataSource.setUser("postgres"); dataSource.setPassword("password"); PGNotificationListener listener = (int processId, String channelName, String payload) -> System.out.println("notification = " + payload); try (PGConnection connection = (PGConnection) dataSource.getConnection()) { Statement statement = connection.createStatement(); statement.execute("LISTEN test"); statement.close(); connection.addNotificationListener(listener); // it only works if the connection is open. Therefore, we do an endless loop here. while (true) { Thread.sleep(500); } } catch (Exception e) { System.err.println(e); } } 

在其他语句中,您现在可以执行NOTIFY test, 'This is a payload'; 。 您还可以在触发器等中执行NOTIFY