1

我正在尝试将 grpc 客户端用于https://github.com/googleapis/googleapis/blob/master/google/devtools/cloudtrace/v1/trace.proto上的 Stackdriver/google 云跟踪定义

我正在发送 protobuf

traces {
  traces {
    project_id: "brightcove-rna-master"
    trace_id: "A096D4956A424EEB98AE7863505B1E1F"
    spans {
      span_id: 1
      kind: RPC_CLIENT
      name: "/bigtableAapiGrpcstaging_aggregated/queryDataSetInternal/buildScan"
      start_time {
        seconds: 1459230665
        nanos: 705000000
      }
      end_time {
        seconds: 1459230666
        nanos: 416000000
      }
      labels {
        key: "videocloud/account"
        value: "4523394965001"
      }
      labels {
        key: "videocloud/protocol"
        value: "2"
      }
      labels {
        key: "videocloud/dimensions"
        value: "protocol,account"
      }
    }
    spans {
      span_id: 2
      kind: RPC_SERVER
      name: "/bigtableAapiGrpcstaging_aggregated/queryDataSetInternal/aggregateScan"
      start_time {
        seconds: 1459230666
        nanos: 420000000
      }
      end_time {
        seconds: 1459230667
        nanos: 753000000
      }
      labels {
        key: "videocloud/account"
        value: "4523394965001"
      }
      labels {
        key: "videocloud/protocol"
        value: "2"
      }
      labels {
        key: "videocloud/dimensions"
        value: "protocol,account"
      }
      labels {
        key: "bigtable/rowCount"
        value: "339"
      }
      labels {
        key: "bigtable/responseCount"
        value: "136"
      }
    }
    spans {
      kind: RPC_SERVER
      name: "/bigtableAapiGrpcstaging_aggregated/queryDataSetInternal"
      start_time {
        seconds: 1459230665
        nanos: 556000000
      }
      end_time {
        seconds: 1459230667
        nanos: 754000000
      }
      labels {
        key: "account"
        value: "4523394965001"
      }
    }
  }
}

但我得到的唯一回报是这个例外:

[WARN ] [2016-03-28 22:51:09,330] [grpc-default-executor-0] rna.api.server.ServerImpl    Unable to send trace to google
io.grpc.StatusRuntimeException: CANCELLED
    at io.grpc.Status.asRuntimeException(Status.java:431)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:358)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$3.runInContext(ClientCallImpl.java:462)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:54)
    at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:154)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

同样,我尝试了以下 ListTraces 请求

traceClient.listTraces(ListTracesRequest.newBuilder()
                .setProjectId(projectId)
                .setView(ListTracesRequest.ViewType.COMPLETE)
                .setStartTime(getEpoch())
                .setEndTime(getCurrentTime())
                .build());

并得到:

java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNIMPLEMENTED: GRPC target method is not implemented.
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:455)
    at com.brightcove.rna.api.server.ServerImpl.sendTraceAsync(ServerImpl.java:143)
    at com.brightcove.rna.api.server.ServerImpl.queryDataset(ServerImpl.java:116)
    at com.brightcove.rna.api.AnalyticsAPIGrpc$1.invoke(AnalyticsAPIGrpc.java:152)
    at com.brightcove.rna.api.AnalyticsAPIGrpc$1.invoke(AnalyticsAPIGrpc.java:147)
    at io.grpc.stub.ServerCalls$1$1.onHalfClose(ServerCalls.java:147)
    at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:255)
    at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$2.runInContext(ServerImpl.java:458)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:54)
    at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:154)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: io.grpc.StatusRuntimeException: UNIMPLEMENTED: GRPC target method is not implemented.
    at io.grpc.Status.asRuntimeException(Status.java:431)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:358)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$3.runInContext(ClientCallImpl.java:462)
    ... 5 more

我对 API 很陌生,所以我不知道我在这里做错了什么。是否有我错过的配置值或类似的东西。

更新生成跨度/跟踪的代码。该代码表示​​获取请求并查询 Cloud BigTable 表的 Grpc 服务。我的目的是跟踪请求的不同方面:

public class ServerImpl implements AnalyticsAPIGrpc.AnalyticsAPI {
    private static final Logger logger = Logger.getLogger(ServerImpl.class);

    private Connection _connection = null;
    private TraceServiceFutureClient traceClient;
    private String projectId;

    @Override
    public void queryDataset(APIRequest request, StreamObserver<APIResponse> responseObserver) {
        APIResponse.Builder response = APIResponse.newBuilder();
        List<TraceSpan> spans = Lists.newArrayList();
        if (request.getTraceToken() != null) {
            response.setTraceToken(request.getTraceToken());
        }

        try {
            spans = queryDataSetInternal(request, response);
            responseObserver.onNext(response.build());
            responseObserver.onCompleted();
        } catch (Exception ex) {
            responseObserver.onError(ex);
        } finally {
            // send trace
            sendTraceAsync(request.getTraceToken(), spans);
        }
    }

    private ListenableFuture<Empty> sendTraceAsync(String traceId, List<TraceSpan> spans) {
        if (spans == null || spans.isEmpty()) {
            return Futures.immediateFuture(Empty.getDefaultInstance());
        }

        PatchTracesRequest patchTracesRequest = PatchTracesRequest.newBuilder()
                .setProjectId(projectId)
                .setTraces(Traces.newBuilder().addTraces(
                        Trace.newBuilder()
                                .setProjectId(projectId)
                                .setTraceId(traceId.replaceAll("-", "").toUpperCase())
                                .addAllSpans(spans)))
                .build();

        if (logger.isTraceEnabled()) {
            logger.trace("Sending trace: " + patchTracesRequest.toString());
        }

        ListenableFuture<Empty> future = traceClient.patchTraces(patchTracesRequest);
        // add callback for logging result
        Futures.addCallback(future, new FutureCallback<Empty>() {
            @Override
            public void onSuccess(@Nullable Empty result) {
                logger.trace("Trace successfully sent to google");
            }

            @Override
            public void onFailure(Throwable t) {
                logger.warn("Unable to send trace to google", t);
            }
        });
        return future;
    }

    private Connection getConnection() throws IOException {
        return this._connection;
    }

    private Scan createScan(APIRequest request, String resourceName) {
        return ScanBuilder.of(
                request.getAccount(),
                resourceName,
                request.getStartTime(), request.getEndTime())
                .build();
    }

    private List<TraceSpan> queryDataSetInternal(APIRequest request, APIResponse.Builder response) throws IOException {
        AtomicLong spanIdCounter = new AtomicLong(0L);
        String parentTraceName = "/api-qa/queryDataSetInternal";
        TraceSpan.Builder parentSpan =
                TraceSpan.newBuilder()
                        .setSpanId(spanIdCounter.getAndIncrement())
                        .setStartTime(getCurrentTime())
                        .setKind(TraceSpan.SpanKind.RPC_SERVER)
                        .setName(parentTraceName)
                        .putAllLabels(ImmutableMap.of("account", request.getAccount()));

        Connection connection = this.getConnection();
        List<TraceSpan> traceSpanList = Lists.newArrayList();
        try (Table table = connection.getTable("tableName")) {
            /// create scan ///
            TraceSpan.Builder traceSpan = TraceSpan.newBuilder()
                    .setSpanId(spanIdCounter.getAndIncrement())
                    .setKind(TraceSpan.SpanKind.RPC_CLIENT)
                    .setName(parentTraceName + "/buildScan")
                    .setParentSpanId(parentSpan.getParentSpanId())
                    .setStartTime(getCurrentTime());

            AtomicInteger count = new AtomicInteger(0);
            // add trace span
            String dimensionStr = Joiner.on(',').join(request.getDimensionsList());
            traceSpan.putAllLabels(ImmutableMap.of(
                    "videocloud/account",    request.getAccount(),
                    "videocloud/protocol",   request.getProtocol(),
                    "videocloud/dimensions", dimensionStr));

            // scan the response and send a stream of rows back
            Scan scan = createScan(request, getResourceName(request));
            logger.debug("Using bigtable scan: " + scan.toJSON());
            ResultScanner scanner = table.getScanner(scan);

            // record trace
            traceSpanList.add(traceSpan.setEndTime(getCurrentTime()).build());

            /// perform aggregation ///
            Timestamp startTS = getCurrentTime();
            List<Result> results = StreamSupport.stream(scanner.spliterator(), false)
                                                .collect(Collectors.toList());
            response.addAllRows(results);

            // record trace
            traceSpan = TraceSpan.newBuilder()
                    .setSpanId(spanIdCounter.getAndIncrement())
                    .setKind(TraceSpan.SpanKind.RPC_SERVER)
                    .setName(parentTraceName + "/aggregateScan")
                    .setParentSpanId(parentSpan.getParentSpanId())
                    .setStartTime(startTS)
                    .setEndTime(getCurrentTime())
                    .putAllLabels(ImmutableMap.of(
                            "videocloud/account",     request.getAccount(),
                            "videocloud/protocol",    request.getProtocol(),
                            "videocloud/dimensions",  dimensionStr,
                            "bigtable/rowCount",      String.valueOf(count.get()),
                            "bigtable/responseCount", String.valueOf(response.getRowsCount())));
            traceSpanList.add(traceSpan.build());

            response.setStatus(APIResponse.Status.OK)
                    .setDetails(String.format("Processed %d results from BigTable", count.get()));

        } finally {
            parentSpan.setEndTime(getCurrentTime());
            traceSpanList.add(parentSpan.build());
        }
        return traceSpanList;
    }
}
4

1 回答 1

1

Ankur,我在 cloud bigtable github 中添加了一个问题

于 2016-04-12T12:37:05.450 回答