我正在为 ksqlDB 开发一个 java 消费者,它使用 PULL 和 PUSH 查询。
目前,单个推送查询一次最多可以流式传输十分钟,之后如果消费者空闲,服务器将关闭连接。即使流或表发生变化,由于连接超时,我们也无法接收到这些变化。有什么办法可以使这种连接持续存在吗?
我正在为 ksqlDB 开发一个 java 消费者,它使用 PULL 和 PUSH 查询。
目前,单个推送查询一次最多可以流式传输十分钟,之后如果消费者空闲,服务器将关闭连接。即使流或表发生变化,由于连接超时,我们也无法接收到这些变化。有什么办法可以使这种连接持续存在吗?
一种解决方法是每隔几分钟 ping ksqlDB 服务器。但它显然会不必要地占用服务器的资源。 https://github.com/confluentinc/ksql/issues/6970
或者可以使用 Reactive Streams Subscriber 编写简单的 onError 方法(有关更多详细信息,请参阅https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/),其中可以编写重新订阅逻辑。因此,如果 ksqlDB 服务器在几个小时后发生下一次更改,onError 方法将捕获超时异常,然后我们可以重新订阅推送查询。
@Override
public synchronized void onError(Throwable t) {
logger.error("Received an error in onError method : " + t);
getPushQueryResult();
}
public void getPushQueryResult() {
try {
client.streamQuery("query").thenAccept(streamedQueryResult -> {
logger.info("Query has started. Query ID: " + streamedQueryResult.queryID());
RowSubscriber subscriber = new RowSubscriber();
streamedQueryResult.subscribe(subscriber);
}).exceptionally(e -> {
logger.error("Request failed : " + e);
return null;
});
} catch (Exception e) {
logger.error("Exception in getPushQueryResult method : " + e);
}
}