1

我正在尝试使用 spring-data-r2dbc。我在 Postgresql 上试试这个。我之前尝试过 spring-data-mongodb-reactive 。我忍不住比较了两者。

我看到尚不支持查询派生。但我想知道是否有等效的@Tailable. 这样我就可以实时收到数据库更改的通知。任何人都可以分享与此相关的任何代码示例。

我知道底层数据库应该支持这一点。我相信 Postgresql 确实支持这种使用逻辑解码的东西(如果我在这里错了,请纠正我)。

@Tailablespring-data-r2dbc 中是否有等价物?

4

1 回答 1

4

我遇到了同样的问题,不确定您是否找到了解决方案,但我可以通过执行以下操作来完成类似的事情。首先,我在表中添加了触发器

CREATE TRIGGER trigger_name
    AFTER INSERT OR DELETE OR UPDATE 
    ON table_name
    FOR EACH ROW
    EXECUTE PROCEDURE trigger_function_name;

每当更新、删除或插入行时,这将在表上设置触发器。然后它将调用我设置的触发函数,如下所示:

CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS 
$BODY$
DECLARE
    payload JSON;
BEGIN
    payload = row_to_json(NEW);
    PERFORM pg_notify('notification_name', payload::text);
    RETURN NULL;
END;
$BODY$;

这将允许我从我的 Spring Boot 项目中“收听”任何这些更新,并将整行作为有效负载发送。接下来,在我的 Spring Boot 项目中,我配置了与我的数据库的连接。

@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                .host("host")
                .database("db")
                .port(port)
                .username("username")
                .password("password")
                .schema("schema")
                .connectTimeout(Duration.ofMinutes(2))
                .build());
    }
}

有了它,我将其自动装配(依赖注入)到我的服务类中的构造函数中,并将其转换为 r2dbc PostgressqlConnection 类,如下所示:

this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();

现在我们想“监听”我们的表并在对我们的表执行一些更新时得到通知。为此,我们使用 @PostContruct 注释设置了一个在依赖注入之后执行的初始化方法

@PostConstruct
private void postConstruct() {
    postgresqlConnection.createStatement("LISTEN notification_name").execute()
            .flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}

请注意,我们监听了我们在 pg_notify 方法中放置的任何名称。我们还想设置一个方法来在 bean 即将被扔掉时关闭连接,如下所示:

@PreDestroy
private void preDestroy() {
    postgresqlConnection.close().subscribe();
}

现在我只需创建一个方法,该方法返回当前表中任何内容的 Flux,并且我还将它与通知合并,正如我在通知以 json 形式出现之前所说的那样,所以我必须反序列化它并决定使用对象映射器。所以,它看起来像这样:

private Flux<YourClass> getUpdatedRows() {
    return postgresqlConnection.getNotifications().map(notification -> {
        try {
            //deserialize json
            return objectMapper.readValue(notification.getParameter(), YourClass.class);
        } catch (IOException e) {
            //handle exception
        }
    });
}

public Flux<YourClass> getDocuments() {
    return documentRepository.findAll().share().concatWith(getUpdatedRows());
}

希望这可以帮助。干杯!

于 2020-05-15T17:03:28.877 回答