1

我正在使用 reactor java 使用 r2dbc 对 Postgres 运行定期任务,如下所示;

Flux.interval(Duration.ofMillis(1000)).doOnNext(i->{
            System.out.print("TIME HAS TICKED\n");
            Flux.range(0,10).flatMap(j->{
                return service.getJob(this.consumerQueueName, this.filter).then();
            }).subscribe();
        }).subscribe();

大约 5 分钟后,它停止处理作业,当我检查 postgres 连接都空闲时:


select        datname as database_name,
       client_addr as client_address,
       application_name,
       backend_start,
       state,
       state_change
from pg_stat_activity;

integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.786098  idle    2020-09-18 04:11:40.471893
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.785822  idle    2020-09-18 04:12:01.196558
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.785598  idle    2020-09-18 04:11:50.971738
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.785317  idle    2020-09-18 04:11:30.506207
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.665800  idle    2020-09-18 04:11:20.570714

如何适当地使用 r2dbc 和 databaseClient 定期从表中获取数据而不会导致此异常?

//ConnectionFactory Settings:

ConnectionFactories.get(
                ConnectionFactoryOptions.builder()
                        .option(Option.valueOf("driver"), "pool")
                        .option(Option.valueOf("protocol"), "postgresql")
                        //.option(ConnectionFactoryOptions.DRIVER, "postgresql")
                        .option(ConnectionFactoryOptions.HOST, "localhost")
                        .option(ConnectionFactoryOptions.PORT, 5432)  // optional, defaults to 5432
                        .option(ConnectionFactoryOptions.USER, "db")
                        .option(ConnectionFactoryOptions.DATABASE, "integrity_service")
                        .option(MAX_SIZE, 5)
                        .build());

private final String fetchJobFormat =
            " WITH cte AS ( SELECT id FROM %s WHERE chain_id='%s' and is_complete=%b  ORDER BY id ASC LIMIT 1\n" +
                    "    )\n" +
                    "                UPDATE queue q\n" +
                    "                SET timestamp = extract(epoch from now()),\n" +
                    "                is_complete = TRUE\n" +
                    "                FROM cte WHERE q.id  = cte.id\n" +
                    "                RETURNING q.id, q.chain_id,q.timestamp, q.is_complete, q.payload";

 public Mono<Queue> getJob(String queue, String filter){

        return databaseClient.execute(String.format(fetchJobFormat,queue,filter,false))
                .fetch().all()
                .flatMap((v) -> {
                    System.out.println("retrieved result" + v.get("id").toString());
                    Queue q = this.objectMapper.convertValue(v, Queue.class);
                    return Mono.just(q);
                }).last();
    }
4

0 回答 0