1

我正在为 ksqlDB 开发一个 java 消费者,它使用 PULL 和 PUSH 查询。

目前,单个推送查询一次最多可以流式传输十分钟,之后如果消费者空闲,服务器将关闭连接。即使流或表发生变化,由于连接超时,我们也无法接收到这些变化。有什么办法可以使这种连接持续存在吗?

4

1 回答 1

0

一种解决方法是每隔几分钟 ping ksqlDB 服务器。但它显然会不必要地占用服务器的资源。 https://github.com/confluentinc/ksql/issues/6970

或者可以使用 Reactive Streams Subscriber 编写简单的 onError 方法(有关更多详细信息,请参阅https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/),其中可以编写重新订阅逻辑。因此,如果 ksqlDB 服务器在几个小时后发生下一次更改,onError 方法将捕获超时异常,然后我们可以重新订阅推送查​​询。

@Override
    public synchronized void onError(Throwable t) {
        logger.error("Received an error in onError method : " + t);
        getPushQueryResult();
    }

public void getPushQueryResult() {
        try {
            client.streamQuery("query").thenAccept(streamedQueryResult -> {
                logger.info("Query has started. Query ID: " + streamedQueryResult.queryID());
                RowSubscriber subscriber = new RowSubscriber();
                streamedQueryResult.subscribe(subscriber);
            }).exceptionally(e -> {
                logger.error("Request failed : " + e);
                return null;
            });
        } catch (Exception e) {
            logger.error("Exception in getPushQueryResult method : " + e);
        }

    }
于 2022-02-08T07:37:19.853 回答