我正在使用 reactor java 使用 r2dbc 对 Postgres 运行定期任务,如下所示;
Flux.interval(Duration.ofMillis(1000)).doOnNext(i->{
System.out.print("TIME HAS TICKED\n");
Flux.range(0,10).flatMap(j->{
return service.getJob(this.consumerQueueName, this.filter).then();
}).subscribe();
}).subscribe();
大约 5 分钟后,它停止处理作业,当我检查 postgres 连接都空闲时:
select datname as database_name,
client_addr as client_address,
application_name,
backend_start,
state,
state_change
from pg_stat_activity;
integrity_service 10.0.73.1 r2dbc-postgresql 2020-09-18 04:11:07.786098 idle 2020-09-18 04:11:40.471893
integrity_service 10.0.73.1 r2dbc-postgresql 2020-09-18 04:11:07.785822 idle 2020-09-18 04:12:01.196558
integrity_service 10.0.73.1 r2dbc-postgresql 2020-09-18 04:11:07.785598 idle 2020-09-18 04:11:50.971738
integrity_service 10.0.73.1 r2dbc-postgresql 2020-09-18 04:11:07.785317 idle 2020-09-18 04:11:30.506207
integrity_service 10.0.73.1 r2dbc-postgresql 2020-09-18 04:11:07.665800 idle 2020-09-18 04:11:20.570714
如何适当地使用 r2dbc 和 databaseClient 定期从表中获取数据而不会导致此异常?
//ConnectionFactory Settings:
ConnectionFactories.get(
ConnectionFactoryOptions.builder()
.option(Option.valueOf("driver"), "pool")
.option(Option.valueOf("protocol"), "postgresql")
//.option(ConnectionFactoryOptions.DRIVER, "postgresql")
.option(ConnectionFactoryOptions.HOST, "localhost")
.option(ConnectionFactoryOptions.PORT, 5432) // optional, defaults to 5432
.option(ConnectionFactoryOptions.USER, "db")
.option(ConnectionFactoryOptions.DATABASE, "integrity_service")
.option(MAX_SIZE, 5)
.build());
private final String fetchJobFormat =
" WITH cte AS ( SELECT id FROM %s WHERE chain_id='%s' and is_complete=%b ORDER BY id ASC LIMIT 1\n" +
" )\n" +
" UPDATE queue q\n" +
" SET timestamp = extract(epoch from now()),\n" +
" is_complete = TRUE\n" +
" FROM cte WHERE q.id = cte.id\n" +
" RETURNING q.id, q.chain_id,q.timestamp, q.is_complete, q.payload";
public Mono<Queue> getJob(String queue, String filter){
return databaseClient.execute(String.format(fetchJobFormat,queue,filter,false))
.fetch().all()
.flatMap((v) -> {
System.out.println("retrieved result" + v.get("id").toString());
Queue q = this.objectMapper.convertValue(v, Queue.class);
return Mono.just(q);
}).last();
}