我有一个公开 gRPC 服务的 go 服务,并使用另一个 gRPC 服务,最后一个是基于 gRPC java 的服务,使用 opentelemetry 正确跟踪。
在我的 go 服务中,我从 repo opentelemetry-go-contrib 复制了这些文件:interceptor.go 和 grpctrace.go,这里https://github.com/open-telemetry/opentelemetry-go-contrib/tree/main/instrumentation/ google.golang.org/grpc/otelgrpc
现在,我创建一个名为 config.go 的文件:
package grpcTracing
import (
"context"
"log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
// Init configures an OpenTelemetry exporter and trace provider
func Init() {
ctx := context.Background()
driver := otlpgrpc.NewDriver(
otlpgrpc.WithInsecure(),
otlpgrpc.WithEndpoint("localhost:55680"),
)
exporter, err := otlp.NewExporter(ctx, driver) // Configure as needed.
if err != nil {
log.Fatal(err)
}
service := "test-service"
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithResource(resource.NewWithAttributes(
label.Key("service.name").String(service),
)),
sdktrace.WithBatcher(exporter),
)
otel.SetTracerProvider(tracerProvider)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
}
现在,当我启动我的 gRPC 服务器时,我会这样做:
grpcTracing.InitTracing()
...
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(grpcTracing.UnaryServerInterceptor()),
)
这是服务器拦截器,这被称为每个请愿书:
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()
entries, spanCtx := Extract(ctx, &metadataCopy, opts...)
ctx = baggage.ContextWithValues(ctx, entries...)
tracer := newConfig(opts).TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(otelcontrib.SemVersion()),
)
name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
)
defer span.End()
Inject(ctx, &requestMetadata, opts...)
ctx = metadata.NewIncomingContext(ctx, metadataCopy)
messageReceived.Event(ctx, 1, req)
span.SetAttributes(label.Any("request", req))
resp, err := handler(ctx, req)
span.SetAttributes(label.Any("response", resp))
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
messageSent.Event(ctx, 1, s.Proto())
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
messageSent.Event(ctx, 1, resp)
}
return resp, err
}
}
使用 gRPC java 服务的客户端:
func (g *GrpcAndesClient) FindDigitalCertificate(transaction *entities.Transaction, customer *entities.Customer) ([]entities.Certificate, error) {
var conn *grpc.ClientConn
var err error
if g.Client == nil {
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpcTracing.UnaryClientInterceptor())) // <--Client interceptor
conn, err = grpc.Dial(g.endPoint, opts...)
if err != nil {
log.Fatalf(dialGrpcServiceError, err)
}
g.Client = generatedClient.NewAndesClientAppClient(conn)
defer conn.Close()
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(g.timeout)*time.Second)
defer cancel()
request := BuildFindCertificateClientRequest(transaction, customer)
md := metadata.Pairs()
ctx = metadata.NewOutgoingContext(context.Background(), md)
clientResponse, err := g.Client.FindDigitalCertificate(ctx, request)
if err != nil {
g.Client = nil
return make([]entities.Certificate, 0),
errcatalogs.MakeBusinessResponseError(
http.StatusPartialContent, fmt.Sprintf(andesClientFindCertificatesErrorMessage, err.Error()))
}
responseCertificates := BuildResponseCertificates(clientResponse)
g.Client = nil
return responseCertificates, nil
}
并且,客户端拦截器:
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
callOpts ...grpc.CallOption,
) error {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
tracer := newConfig(opts).TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(otelcontrib.SemVersion()),
)
name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
)
defer span.End()
Inject(ctx, &metadataCopy, opts...)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
messageSent.Event(ctx, 1, req)
err := invoker(ctx, method, req, reply, cc, callOpts...)
messageReceived.Event(ctx, 1, reply)
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}
return err
}
}
这会生成两个跨度,第一个带有服务器请求和响应,第二个带有客户端信息,但这不相关。
我应该怎么做才能使两个拦截器的跨度相关?