我遇到了同样的问题,不确定您是否找到了解决方案,但我可以通过执行以下操作来完成类似的事情。首先,我在表中添加了触发器
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
每当更新、删除或插入行时,这将在表上设置触发器。然后它将调用我设置的触发函数,如下所示:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
这将允许我从我的 Spring Boot 项目中“收听”任何这些更新,并将整行作为有效负载发送。接下来,在我的 Spring Boot 项目中,我配置了与我的数据库的连接。
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
有了它,我将其自动装配(依赖注入)到我的服务类中的构造函数中,并将其转换为 r2dbc PostgressqlConnection 类,如下所示:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
现在我们想“监听”我们的表并在对我们的表执行一些更新时得到通知。为此,我们使用 @PostContruct 注释设置了一个在依赖注入之后执行的初始化方法
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
请注意,我们监听了我们在 pg_notify 方法中放置的任何名称。我们还想设置一个方法来在 bean 即将被扔掉时关闭连接,如下所示:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
现在我只需创建一个方法,该方法返回当前表中任何内容的 Flux,并且我还将它与通知合并,正如我在通知以 json 形式出现之前所说的那样,所以我必须反序列化它并决定使用对象映射器。所以,它看起来像这样:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
希望这可以帮助。干杯!