3

我有一个公开 gRPC 服务的 go 服务,并使用另一个 gRPC 服务,最后一个是基于 gRPC java 的服务,使用 opentelemetry 正确跟踪。

在我的 go 服务中,我从 repo opentelemetry-go-contrib 复制了这些文件:interceptor.go 和 grpctrace.go,这里https://github.com/open-telemetry/opentelemetry-go-contrib/tree/main/instrumentation/ google.golang.org/grpc/otelgrpc

现在,我创建一个名为 config.go 的文件:

package grpcTracing

import (
  "context"
  "log"

  "go.opentelemetry.io/otel"
  "go.opentelemetry.io/otel/exporters/otlp"
  "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
  "go.opentelemetry.io/otel/label"
  "go.opentelemetry.io/otel/propagation"
  "go.opentelemetry.io/otel/sdk/resource"
  sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// Init configures an OpenTelemetry exporter and trace provider
func Init() {

  ctx := context.Background()

  driver := otlpgrpc.NewDriver(
    otlpgrpc.WithInsecure(),
    otlpgrpc.WithEndpoint("localhost:55680"),
  )

  exporter, err := otlp.NewExporter(ctx, driver) // Configure as needed.
  if err != nil {
    log.Fatal(err)
  }

  service := "test-service"

  tracerProvider := sdktrace.NewTracerProvider(
    sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
    sdktrace.WithResource(resource.NewWithAttributes(
      label.Key("service.name").String(service),
    )),
    sdktrace.WithBatcher(exporter),
  )

  otel.SetTracerProvider(tracerProvider)
  otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
}

现在,当我启动我的 gRPC 服务器时,我会这样做:

        grpcTracing.InitTracing()
        ...
    grpcServer := grpc.NewServer(
        grpc.UnaryInterceptor(grpcTracing.UnaryServerInterceptor()),
    )

这是服务器拦截器,这被称为每个请愿书:

func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
  return func(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
  ) (interface{}, error) {
    requestMetadata, _ := metadata.FromIncomingContext(ctx)
    metadataCopy := requestMetadata.Copy()

    entries, spanCtx := Extract(ctx, &metadataCopy, opts...)
    ctx = baggage.ContextWithValues(ctx, entries...)

    tracer := newConfig(opts).TracerProvider.Tracer(
      instrumentationName,
      trace.WithInstrumentationVersion(otelcontrib.SemVersion()),
    )

    name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))

    ctx, span := tracer.Start(
      trace.ContextWithRemoteSpanContext(ctx, spanCtx),
      name,
      trace.WithSpanKind(trace.SpanKindServer),
      trace.WithAttributes(attr...),
    )
    defer span.End()

    Inject(ctx, &requestMetadata, opts...)
    ctx = metadata.NewIncomingContext(ctx, metadataCopy)

    messageReceived.Event(ctx, 1, req)

    span.SetAttributes(label.Any("request", req))

    resp, err := handler(ctx, req)

    span.SetAttributes(label.Any("response", resp))

    if err != nil {
      s, _ := status.FromError(err)
      span.SetStatus(codes.Error, s.Message())
      span.SetAttributes(statusCodeAttr(s.Code()))
      messageSent.Event(ctx, 1, s.Proto())
    } else {
      span.SetAttributes(statusCodeAttr(grpc_codes.OK))
      messageSent.Event(ctx, 1, resp)
    }

    return resp, err
  }
}

使用 gRPC java 服务的客户端:

func (g *GrpcAndesClient) FindDigitalCertificate(transaction *entities.Transaction, customer *entities.Customer) ([]entities.Certificate, error) {
    var conn *grpc.ClientConn
    var err error
    if g.Client == nil {
        var opts []grpc.DialOption
        opts = append(opts, grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpcTracing.UnaryClientInterceptor())) // <--Client interceptor
        conn, err = grpc.Dial(g.endPoint, opts...)
        if err != nil {
            log.Fatalf(dialGrpcServiceError, err)
        }
        g.Client = generatedClient.NewAndesClientAppClient(conn)
        defer conn.Close()
    }
    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(g.timeout)*time.Second)
    defer cancel()

    request := BuildFindCertificateClientRequest(transaction, customer)

    md := metadata.Pairs()

    ctx = metadata.NewOutgoingContext(context.Background(), md)

    clientResponse, err := g.Client.FindDigitalCertificate(ctx, request)
    if err != nil {
        g.Client = nil
        return make([]entities.Certificate, 0),
            errcatalogs.MakeBusinessResponseError(
                http.StatusPartialContent, fmt.Sprintf(andesClientFindCertificatesErrorMessage, err.Error()))
    }
    responseCertificates := BuildResponseCertificates(clientResponse)
    g.Client = nil
    return responseCertificates, nil
}

并且,客户端拦截器:

func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
  return func(
    ctx context.Context,
    method string,
    req, reply interface{},
    cc *grpc.ClientConn,
    invoker grpc.UnaryInvoker,
    callOpts ...grpc.CallOption,
  ) error {

    requestMetadata, _ := metadata.FromOutgoingContext(ctx)
    metadataCopy := requestMetadata.Copy()


    tracer := newConfig(opts).TracerProvider.Tracer(
      instrumentationName,
      trace.WithInstrumentationVersion(otelcontrib.SemVersion()),
    )

    name, attr := spanInfo(method, cc.Target())
    var span trace.Span
    ctx, span = tracer.Start(
      ctx,
      name,
      trace.WithSpanKind(trace.SpanKindClient),
      trace.WithAttributes(attr...),
    )
    defer span.End()

    Inject(ctx, &metadataCopy, opts...)
    ctx = metadata.NewOutgoingContext(ctx, metadataCopy)

    messageSent.Event(ctx, 1, req)

    err := invoker(ctx, method, req, reply, cc, callOpts...)

    messageReceived.Event(ctx, 1, reply)

    if err != nil {
      s, _ := status.FromError(err)
      span.SetStatus(codes.Error, s.Message())
      span.SetAttributes(statusCodeAttr(s.Code()))
    } else {
      span.SetAttributes(statusCodeAttr(grpc_codes.OK))
    }

    return err
  }
}

这会生成两个跨度,第一个带有服务器请求和响应,第二个带有客户端信息,但这不相关。

我应该怎么做才能使两个拦截器的跨度相关?

4

0 回答 0