如何在运行状态下更新更改提要过程中的 cosmos 主密钥。
我在 Spring Boot 应用程序中使用更改提要过程,它工作正常。但是如果重新生成主键,进程作为后台进程(异步)运行,无法捕获异常并使用辅助键更新键或重新启动进程,则会失败。以下是详细信息
Pom.xml 中的依赖项
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.17.0</version>
<exclusions>
<exclusion>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.9</version>
</dependency>
代码片段
String cosmoshost = "<cosmos db host url>";
public void startProcess(){
try {
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
changeFeedProcessorOptions
.setFeedPollDelay(Duration.ofMillis(60000));
changeFeedProcessorOptions.setLeasePrefix("SpringBoot");
new ChangeFeedProcessorBuilder().hostName(cosmoshost)
.feedContainer(getCosmosAsyncContainer("feedcontainer"))
.leaseContainer(getCosmosAsyncContainer("leasecontainer"))
.options(changeFeedProcessorOptions).handleChanges(docs -> {
try {
for (JsonNode doc : docs) {
logger.debug("Received a record " + doc.toString());
}
} catch (Exception e) {
logger.error("Exception: {}",e.getMessage());
}
}).buildChangeFeedProcessor().start().subscribe();
} catch (Exception e) {
logger.error(e.getMessage());
}
}
private CosmosAsyncContainer getCosmosAsyncContainer(String containerId) {
CosmosAsyncClient cosmosAsyncClient = getCosmosAsyncClient();
return cosmosAsyncClient.getDatabase("DB").getContainer(containerId);
}
private CosmosAsyncClient getCosmosAsyncClient() throws Exception {
return getCosmosAsyncClient(true);
}
private CosmosAsyncClient getCosmosAsyncClient(boolean isPrimaryKey) throws Exception {
CosmosAsyncClient cosmosAsyncClient = null;
try {
String key = (isPrimaryKey)? "primarykey":"secondarykey";
ConnectionPolicy defaultConnectionPolicy = ConnectionPolicy.getDefaultPolicy();
defaultConnectionPolicy.setUserAgentSuffix("changeFeed");
cosmosAsyncClient = new CosmosClientBuilder().endpoint(cosmoshost).key(key)
.consistencyLevel(ConsistencyLevel.EVENTUAL)
.readRequestsFallbackEnabled(true).multipleWriteRegionsEnabled(true)
.contentResponseOnWriteEnabled(true).buildAsyncClient();
} catch (Exception exception) {
logger.error("Failed: {}",exception.getMessage());
if (isPrimaryKey) {
return getCosmosAsyncClient(!isPrimaryKey);
} else {
throw exception;
}
}
return cosmosAsyncClient;
}
错误:
[2021-08-11T22:38:59.059Z] [com.azure.cosmos.implementation.query.DocumentProducer] [cosmos-rntbd-nio-5-6] [199] [ERROR] [] Unexpected failure
com.azure.cosmos.implementation.UnauthorizedException: {"innerErrorMessage":"[\"The MAC signature found in the HTTP request is not the same as the computed signature. Server used following string to sign - 'post\\ndocs\\ndbs/DB/colls/leases\\nwed, 11 aug 2021 17:08:59 gmt\\n\\n'. Learn more: https://aka.ms/cosmosdb-tsg-mac-signature\"]","cosmosDiagnostics":{"userAgent":"azsdk-java-cosmos/4.17.0 Windows10/10.0 JRE/1.8.0_251","requestLatencyInMs":311,"requestStartTimeUTC":"2021-08-11T17:08:59.068Z","requestEndTimeUTC":"2021-08-11T17:08:59.379Z","responseStatisticsList":[{"storeResult":{"storePhysicalAddress":"rntbd://cdb-ms-prod-centralus1-fd46.documents.azure.com:14436/apps/9a765551-fd9d-4ff2-9920-8e076aea3c8e/services/bbd91f20-33de-4bd4-b65c-fbdc3a51fa5a/partitions/813f95e0-a7fb-4137-b4ba-fa779467f66e/replicas/132731745009873205p/","lsn":38,"globalCommittedLsn":38,"partitionKeyRangeId":"","isValid":true,"statusCode":401,"subStatusCode":0,"isGone":false,"isNotFound":false,"isInvalidPartition":false,"isThroughputControlRequestRateTooLarge":false,"requestCharge":0.0,"itemLSN":-1,"sessionToken":"-1#38","backendLatencyInMs":0.17,"exception":"[\"The MAC signature found in the HTTP request is not the same as the computed signature. Server used following string to sign - 'post\\ndocs\\ndbs/DB/colls/leases\\nwed, 11 aug 2021 17:08:59 gmt\\n\\n'. Learn more: https://aka.ms/cosmosdb-tsg-mac-signature\"]","transportRequestTimeline":[{"eventName":"created","startTimeUTC":"2021-08-11T17:08:59.070Z","durationInMicroSec":0},{"eventName":"queued","startTimeUTC":"2021-08-11T17:08:59.070Z","durationInMicroSec":0},{"eventName":"channelAcquisitionStarted","startTimeUTC":"2021-08-11T17:08:59.070Z","durationInMicroSec":1000},{"eventName":"pipelined","startTimeUTC":"2021-08-11T17:08:59.071Z","durationInMicroSec":1000},{"eventName":"transitTime","startTimeUTC":"2021-08-11T17:08:59.072Z","durationInMicroSec":305000},{"eventName":"received","startTimeUTC":"2021-08-11T17:08:59.377Z","durationInMicroSec":1000},{"eventName":"completed","startTimeUTC":"2021-08-11T17:08:59.378Z","durationInMicroSec":1000}],"rntbdRequestLengthInBytes":629,"rntbdResponseLengthInBytes":463,"requestPayloadLengthInBytes":223,"responsePayloadLengthInBytes":null,"channelTaskQueueSize":1,"pendingRequestsCount":1,"serviceEndpointStatistics":{"availableChannels":1,"acquiredChannels":0,"executorTaskQueueSize":0,"inflightRequests":1,"lastSuccessfulRequestTime":"2021-08-11T17:08:13.091Z","lastRequestTime":"2021-08-11T17:08:58.482Z","createdTime":"2021-08-11T17:02:16.015Z","isClosed":false}},"requestResponseTimeUTC":"2021-08-11T17:08:59.379Z","requestResourceType":"Document","requestOperationType":"Query"}],"supplementalResponseStatisticsList":[],"addressResolutionStatistics":{},"regionsContacted":["https://keyvaultmanagedcosmos-centralus.documents.azure.com:443/"],"retryContext":{"statusAndSubStatusCodes":null,"retryCount":0,"retryLatency":0},"metadataDiagnosticsContext":{"metadataDiagnosticList":null},"serializationDiagnosticsContext":{"serializationDiagnosticsList":null},"gatewayStatistics":null,"systemInformation":{"usedMemory":"170650 KB","availableMemory":"3507046 KB","systemCpuLoad":"(2021-08-11T17:08:33.546Z 20.4%), (2021-08-11T17:08:38.552Z 14.8%), (2021-08-11T17:08:43.546Z 15.2%), (2021-08-11T17:08:48.547Z 17.0%), (2021-08-11T17:08:53.557Z 19.8%), (2021-08-11T17:08:58.553Z 14.4%)","availableProcessors":8},"clientCfgs":{"id":-3,"connectionMode":"DIRECT","numberOfClients":5,"connCfg":{"rntbd":"(cto:PT5S, rto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30, cer:false)","gw":"(cps:1000, rto:PT5S, icto:null, p:false)","other":"(ed: true, cs: false)"},"consistencyCfg":"(consistency: Eventual, mm: true, prgns: [])"}}}
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.messageReceived(RntbdRequestManager.java:848) [azure-cosmos-4.17.0.jar:?]
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.channelRead(RntbdRequestManager.java:181) [azure-cosmos-4.17.0.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368) [netty-handler-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234) [netty-handler-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280) [netty-handler-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) [netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) [netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-common-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.65.Final.jar:4.1.65.Final]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_251]```