通知postgres对java应用程序的更改
问题
我正在为几十万种产品建立一个postgres数据库 。 我将设置一个索引(Solr或ElasticSearch)来改善复杂搜索查询的查询时间。
现在的重点是如何让索引与数据库同步?
在过去, 我有一种应用程序定期轮询数据库以检查应该完成的更新 ,但我会有一个过时的索引状态时间 (从数据库更新到索引更新拉动)。
我更喜欢一种解决方案,其中数据库将通知我的应用程序 (Java应用程序)数据库中的某些内容已被更改,此时应用程序将决定是否需要更新索引。 为了更准确,我将构建一种生产者和消费者结构,希望副本将从postgres接收有关更改的通知,如果这与索引的数据相关,则将其存储在待执行更新的堆栈中。 消费者将使用此堆栈并构建要存储到索引中的文档。
可能的解决方案
一种解决方案是编写一种副本端点 ,其中应用程序将表现为用于复制原始数据库中的数据的postgres实例。 有人对这种方法有一些经验吗?
我对此问题有哪些其他解决方案?
我对此问题有哪些其他解决方案?
使用LISTEN
和NOTIFY
告诉您的应用程序已发生变化。
您可以从也记录队列表中的更改的触发器发送NOTIFY
。
你需要一个PgJDBC连接,它已经为你正在使用的事件发送了一个LISTEN
。 如果您使用SSL,它必须通过定期发送空查询( ""
)来轮询数据库; 如果您不使用SSL,则可以通过使用异步通知检查来避免这种情况。 您需要从连接池中解包Connection
对象,以便能够将基础连接转换为PgConnection
以使用listen / notify。 见相关答案
生产者/消费者的位将更难。 要在PostgreSQL中拥有多个崩溃安全的并发使用者,您需要使用pg_try_advisory_lock(...)
建议锁定。 如果你不需要并发消费者那么这很容易,你只需要SELECT ... LIMIT 1 FOR UPDATE
一行。
希望9.4将包含一个更简单的方法,用FOR UPDATE
跳过锁定的行,因为它正在开发中。
要使用postgres的LISTEN和NOTIFY,您需要使用可支持异步通知的驱动程序。 postgres JDBC驱动程序不支持异步通知。
要不断地从Application Server通过一个通道,请转到pgjdbc-ng 0.6驱动程序。
http://impossibl.github.io/pgjdbc-ng/
它支持异步通知,无需轮询。
一般来说,我建议使用EAI模式实现松散耦合。 然后,如果您决定交换数据库,则索引端的代码不会更改。
如果您想坚持使用紧耦合,我建议使用LISTEN / NOTIFY 。 在Java中,使用pgjdbc-ng驱动程序很重要,因为它支持没有轮询的异步通知。
这是一个异步模式(基于这个答案 ):
import com.impossibl.postgres.api.jdbc.PGConnection; import com.impossibl.postgres.api.jdbc.PGNotificationListener; import com.impossibl.postgres.jdbc.PGDataSource; import java.sql.Statement; public static void listenToNotifyMessage() { PGDataSource dataSource = new PGDataSource(); dataSource.setHost("localhost"); dataSource.setPort(5432); dataSource.setDatabase("database_name"); dataSource.setUser("postgres"); dataSource.setPassword("password"); PGNotificationListener listener = (int processId, String channelName, String payload) -> System.out.println("notification = " + payload); try (PGConnection connection = (PGConnection) dataSource.getConnection()) { Statement statement = connection.createStatement(); statement.execute("LISTEN test"); statement.close(); connection.addNotificationListener(listener); // it only works if the connection is open. Therefore, we do an endless loop here. while (true) { Thread.sleep(500); } } catch (Exception e) { System.err.println(e); } }
在其他语句中,您现在可以执行NOTIFY test, 'This is a payload';
。 您还可以在触发器等中执行NOTIFY
。