6

问题

我正在为几十万种产品构建一个postgres 数据库。我将建立一个索引(Solr 或者 ElasticSearch)来改进复杂搜索查询的查询时间。

现在的重点是如何让索引与数据库同步?

过去,我有一种应用程序会定期轮询数据库以检查应该完成的更新,但我会有一个过时的索引状态时间(从数据库更新到索引更新拉取)。

我更喜欢这样的解决方案,其中数据库会通知我的应用程序(java 应用程序)数据库中的某些内容已更改,然后应用程序将决定是否需要更新索引。更准确地说,我会构建一种生产者和消费者结构,希望副本能够从 postgres 收到通知,告知发生了一些变化,如果这与索引的数据相关,它会存储在要更新的堆栈中。消费者将使用此堆栈并构建要存储到索引中的文档。

可能的解决方案

一种解决方案是编写一种副本端点,其中应用程序将充当一个 postgres 实例,用于从原始数据库复制数据。有人对这种方法有一些经验吗?

对于这个问题,我还有哪些其他解决方案?

4

3 回答 3

4

Which other solution do I have for this problem?

Use LISTEN and NOTIFY to tell your app that things have changed.

You can send the NOTIFY from a trigger that also records changes in a queue table.

You'll need a PgJDBC connection that has sent a LISTEN for the event(s) you're using. It must poll the database by sending periodic empty queries ("") if you're using SSL; if you are not using SSL this can be avoided by use of the async notification checks. You'll need to unwrap the Connection object from your connection pool to be able to cast the underlying connection to a PgConnection to use listen/notify with. See related answer

The producer/consumer bit will be harder. To have multiple crash-safe concurrent consumers in PostgreSQL you need to use advisory locking with pg_try_advisory_lock(...). If you don't need concurrent consumers then it's easy, you just SELECT ... LIMIT 1 FOR UPDATE a row at a time.

Hopefully 9.4 will include an easier method of skipping locked rows with FOR UPDATE, as there's work in development for it.

于 2013-08-08T12:58:41.067 回答
3

要使用 postgres 的 LISTEN 和 NOTIFY,您需要使用可以支持异步通知的驱动程序。postgres JDBC 驱动程序不支持异步通知。

要不断地从 Application Server 的通道上侦听,请使用 pgjdbc-ng 0.6 驱动程序。

http://impossibl.github.io/pgjdbc-ng/

它支持异步通知,无需轮询。

于 2016-05-23T16:48:41.037 回答
3

一般来说,我会推荐使用EAI 模式来实现松散耦合。然后,如果您决定交换数据库,则索引端的代码不会更改。

如果您想坚持紧密耦合,我建议使用 LISTEN/NOTIFY。在 Java 中,使用pgjdbc-ng 驱动程序很重要,因为它支持异步通知而无需轮询。

这是一个异步模式(基于this answer):

import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
import com.impossibl.postgres.jdbc.PGDataSource;    
import java.sql.Statement;

public static void listenToNotifyMessage() {
    PGDataSource dataSource = new PGDataSource();
    dataSource.setHost("localhost");
    dataSource.setPort(5432);
    dataSource.setDatabase("database_name");
    dataSource.setUser("postgres");
    dataSource.setPassword("password");

    PGNotificationListener listener = (int processId, String channelName, String payload) 
       -> System.out.println("notification = " + payload);

    try (PGConnection connection = (PGConnection) dataSource.getConnection()) {
        Statement statement = connection.createStatement();
        statement.execute("LISTEN test");
        statement.close();
        connection.addNotificationListener(listener);
        // it only works if the connection is open. Therefore, we do an endless loop here.
        while (true) {
           Thread.sleep(500);
       }
    } catch (Exception e) {
        System.err.println(e);
    }
}

在其他语句中,您现在可以执行NOTIFY test, 'This is a payload';. 您还可以NOTIFY在触发器等中执行。

于 2016-09-12T09:16:52.297 回答