在这个微服务https://github.com/dcdh/inner-friends-user-profile-picture我正在尝试实现 opentelemetry。
但是我的 E2ETest 失败了,因为我的跨度之一丢失了。挖掘后我找到了他,但他并不依附于他的父母。
孤立跨度“HazelcastUserProfilePictureCacheRepository.store”应该在具有两个跨度的第二个跟踪中。
示例失败代码
我使用六边形架构设计了这个微服务。该域使用 Mutiny 以异步方式设计它。关于我quarkus-smallrye-context-propagation
用来传播上下文的基础设施。关于开放遥测,我以编程方式创建了新的跨度,OpenTelemetryTracingService
所以也许我应该在这里注入一个父上下文。但是我不知道如何去做(他有什么正确的方法来检索它以将它与 span builder 连接)。
关于入口点所在的保存功能SaveUserProfilePictureUseCase
@Override
public Uni<R> execute(final SaveUserProfilePictureCommand command,
final ResponseTransformer<R> responseTransformer) {
return Uni.createFrom()
.deferred(() -> profilePictureRepository.save(command.userPseudo(), command.picture(), command.mediaType()))
.chain(profilePictureSaved -> userProfilePictureCacheRepository.storeFeatured(command.userPseudo(), profilePictureSaved)
.onItemOrFailure().transform((item, exception) -> profilePictureSaved))
.map(profilePictureSaved -> responseTransformer.toResponse(profilePictureSaved))
.onFailure(ProfilePictureRepositoryException.class)
.recoverWithItem(profilePictureRepositoryException -> responseTransformer.toResponse((ProfilePictureRepositoryException) profilePictureRepositoryException))
.onFailure()
.recoverWithItem(exception -> responseTransformer.toResponse(exception));
}
它将调用基础设施中的 s3 存储
@Override
public Uni<ProfilePictureSaved> save(final UserPseudo userPseudo,
final byte[] picture,
final SupportedMediaType mediaType) throws ProfilePictureRepositoryException {
return Uni.createFrom()
.deferred(() -> {
final Span span = openTelemetryTracingService.startANewSpan("S3ProfilePictureRepository.save");
final PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(bucketUserProfilePictureName)
.key(s3ObjectKeyProvider.objectKey(userPseudo, mediaType).value())
.contentType(mediaType.contentType())
.build();
return Uni.createFrom()
.completionStage(() -> s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromBytes(picture)))
.map(putObjectResponse -> new S3ProfilePictureSaved(userPseudo, mediaType, putObjectResponse))
.onFailure(SdkException.class)
.transform(exception -> {
LOG.error(exception);
openTelemetryTracingService.markSpanInError(span);
return new ProfilePictureRepositoryException();
})
.onTermination()
.invoke(() -> openTelemetryTracingService.endSpan(span));
});
}
它正在调用 hazelcast 缓存存储库
@Override
public Uni<CachedUserProfilePicture> storeFeatured(final UserPseudo userPseudo, final ProfilePictureIdentifier featured) {
return Uni.createFrom()
.deferred(() -> {
final Span span = openTelemetryTracingService.startANewSpan("HazelcastUserProfilePictureCacheRepository.store");
return Uni.createFrom()
.completionStage(() -> hazelcastInstance.getMap(MAP_NAME).getAsync(userPseudo.pseudo()))
.replaceIfNullWith(() -> HazelcastCachedUserProfilePicture.newBuilder().setUserPseudo(userPseudo.pseudo()).build())
.onItem().castTo(HazelcastCachedUserProfilePicture.class)
.chain(hazelcastCachedUserProfilePicture -> {
hazelcastCachedUserProfilePicture.setFeaturedProfilePictureIdentifier(new HazelcastProfilePictureIdentifier(featured));
return Uni.createFrom().completionStage(() -> hazelcastInstance.getMap(MAP_NAME).putAsync(userPseudo.pseudo(), hazelcastCachedUserProfilePicture))
.map(response -> hazelcastCachedUserProfilePicture);
})
.onItem().castTo(HazelcastCachedUserProfilePicture.class)
.onTermination()
.invoke(() -> openTelemetryTracingService.endSpan(span));
});
}
他们都调用openTelemtryTracingService
来管理跨度生命周期。
重现步骤
- 签出代码
should_store_user_profile_picture
在 E2ETest 中运行
SaveUserProfilePictureUseCase
我想在链接 Uni 时上下文应该已经丢失了。
你能告诉我这个实现有什么问题吗?
提前谢谢。
问候,
达米安