我正在尝试将 grpc 客户端用于https://github.com/googleapis/googleapis/blob/master/google/devtools/cloudtrace/v1/trace.proto上的 Stackdriver/google 云跟踪定义
我正在发送 protobuf
traces {
traces {
project_id: "brightcove-rna-master"
trace_id: "A096D4956A424EEB98AE7863505B1E1F"
spans {
span_id: 1
kind: RPC_CLIENT
name: "/bigtableAapiGrpcstaging_aggregated/queryDataSetInternal/buildScan"
start_time {
seconds: 1459230665
nanos: 705000000
}
end_time {
seconds: 1459230666
nanos: 416000000
}
labels {
key: "videocloud/account"
value: "4523394965001"
}
labels {
key: "videocloud/protocol"
value: "2"
}
labels {
key: "videocloud/dimensions"
value: "protocol,account"
}
}
spans {
span_id: 2
kind: RPC_SERVER
name: "/bigtableAapiGrpcstaging_aggregated/queryDataSetInternal/aggregateScan"
start_time {
seconds: 1459230666
nanos: 420000000
}
end_time {
seconds: 1459230667
nanos: 753000000
}
labels {
key: "videocloud/account"
value: "4523394965001"
}
labels {
key: "videocloud/protocol"
value: "2"
}
labels {
key: "videocloud/dimensions"
value: "protocol,account"
}
labels {
key: "bigtable/rowCount"
value: "339"
}
labels {
key: "bigtable/responseCount"
value: "136"
}
}
spans {
kind: RPC_SERVER
name: "/bigtableAapiGrpcstaging_aggregated/queryDataSetInternal"
start_time {
seconds: 1459230665
nanos: 556000000
}
end_time {
seconds: 1459230667
nanos: 754000000
}
labels {
key: "account"
value: "4523394965001"
}
}
}
}
但我得到的唯一回报是这个例外:
[WARN ] [2016-03-28 22:51:09,330] [grpc-default-executor-0] rna.api.server.ServerImpl Unable to send trace to google
io.grpc.StatusRuntimeException: CANCELLED
at io.grpc.Status.asRuntimeException(Status.java:431)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:358)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$3.runInContext(ClientCallImpl.java:462)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:54)
at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:154)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
同样,我尝试了以下 ListTraces 请求
traceClient.listTraces(ListTracesRequest.newBuilder()
.setProjectId(projectId)
.setView(ListTracesRequest.ViewType.COMPLETE)
.setStartTime(getEpoch())
.setEndTime(getCurrentTime())
.build());
并得到:
java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNIMPLEMENTED: GRPC target method is not implemented.
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:455)
at com.brightcove.rna.api.server.ServerImpl.sendTraceAsync(ServerImpl.java:143)
at com.brightcove.rna.api.server.ServerImpl.queryDataset(ServerImpl.java:116)
at com.brightcove.rna.api.AnalyticsAPIGrpc$1.invoke(AnalyticsAPIGrpc.java:152)
at com.brightcove.rna.api.AnalyticsAPIGrpc$1.invoke(AnalyticsAPIGrpc.java:147)
at io.grpc.stub.ServerCalls$1$1.onHalfClose(ServerCalls.java:147)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:255)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$2.runInContext(ServerImpl.java:458)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:54)
at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:154)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.grpc.StatusRuntimeException: UNIMPLEMENTED: GRPC target method is not implemented.
at io.grpc.Status.asRuntimeException(Status.java:431)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:358)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$3.runInContext(ClientCallImpl.java:462)
... 5 more
我对 API 很陌生,所以我不知道我在这里做错了什么。是否有我错过的配置值或类似的东西。
更新生成跨度/跟踪的代码。该代码表示获取请求并查询 Cloud BigTable 表的 Grpc 服务。我的目的是跟踪请求的不同方面:
public class ServerImpl implements AnalyticsAPIGrpc.AnalyticsAPI {
private static final Logger logger = Logger.getLogger(ServerImpl.class);
private Connection _connection = null;
private TraceServiceFutureClient traceClient;
private String projectId;
@Override
public void queryDataset(APIRequest request, StreamObserver<APIResponse> responseObserver) {
APIResponse.Builder response = APIResponse.newBuilder();
List<TraceSpan> spans = Lists.newArrayList();
if (request.getTraceToken() != null) {
response.setTraceToken(request.getTraceToken());
}
try {
spans = queryDataSetInternal(request, response);
responseObserver.onNext(response.build());
responseObserver.onCompleted();
} catch (Exception ex) {
responseObserver.onError(ex);
} finally {
// send trace
sendTraceAsync(request.getTraceToken(), spans);
}
}
private ListenableFuture<Empty> sendTraceAsync(String traceId, List<TraceSpan> spans) {
if (spans == null || spans.isEmpty()) {
return Futures.immediateFuture(Empty.getDefaultInstance());
}
PatchTracesRequest patchTracesRequest = PatchTracesRequest.newBuilder()
.setProjectId(projectId)
.setTraces(Traces.newBuilder().addTraces(
Trace.newBuilder()
.setProjectId(projectId)
.setTraceId(traceId.replaceAll("-", "").toUpperCase())
.addAllSpans(spans)))
.build();
if (logger.isTraceEnabled()) {
logger.trace("Sending trace: " + patchTracesRequest.toString());
}
ListenableFuture<Empty> future = traceClient.patchTraces(patchTracesRequest);
// add callback for logging result
Futures.addCallback(future, new FutureCallback<Empty>() {
@Override
public void onSuccess(@Nullable Empty result) {
logger.trace("Trace successfully sent to google");
}
@Override
public void onFailure(Throwable t) {
logger.warn("Unable to send trace to google", t);
}
});
return future;
}
private Connection getConnection() throws IOException {
return this._connection;
}
private Scan createScan(APIRequest request, String resourceName) {
return ScanBuilder.of(
request.getAccount(),
resourceName,
request.getStartTime(), request.getEndTime())
.build();
}
private List<TraceSpan> queryDataSetInternal(APIRequest request, APIResponse.Builder response) throws IOException {
AtomicLong spanIdCounter = new AtomicLong(0L);
String parentTraceName = "/api-qa/queryDataSetInternal";
TraceSpan.Builder parentSpan =
TraceSpan.newBuilder()
.setSpanId(spanIdCounter.getAndIncrement())
.setStartTime(getCurrentTime())
.setKind(TraceSpan.SpanKind.RPC_SERVER)
.setName(parentTraceName)
.putAllLabels(ImmutableMap.of("account", request.getAccount()));
Connection connection = this.getConnection();
List<TraceSpan> traceSpanList = Lists.newArrayList();
try (Table table = connection.getTable("tableName")) {
/// create scan ///
TraceSpan.Builder traceSpan = TraceSpan.newBuilder()
.setSpanId(spanIdCounter.getAndIncrement())
.setKind(TraceSpan.SpanKind.RPC_CLIENT)
.setName(parentTraceName + "/buildScan")
.setParentSpanId(parentSpan.getParentSpanId())
.setStartTime(getCurrentTime());
AtomicInteger count = new AtomicInteger(0);
// add trace span
String dimensionStr = Joiner.on(',').join(request.getDimensionsList());
traceSpan.putAllLabels(ImmutableMap.of(
"videocloud/account", request.getAccount(),
"videocloud/protocol", request.getProtocol(),
"videocloud/dimensions", dimensionStr));
// scan the response and send a stream of rows back
Scan scan = createScan(request, getResourceName(request));
logger.debug("Using bigtable scan: " + scan.toJSON());
ResultScanner scanner = table.getScanner(scan);
// record trace
traceSpanList.add(traceSpan.setEndTime(getCurrentTime()).build());
/// perform aggregation ///
Timestamp startTS = getCurrentTime();
List<Result> results = StreamSupport.stream(scanner.spliterator(), false)
.collect(Collectors.toList());
response.addAllRows(results);
// record trace
traceSpan = TraceSpan.newBuilder()
.setSpanId(spanIdCounter.getAndIncrement())
.setKind(TraceSpan.SpanKind.RPC_SERVER)
.setName(parentTraceName + "/aggregateScan")
.setParentSpanId(parentSpan.getParentSpanId())
.setStartTime(startTS)
.setEndTime(getCurrentTime())
.putAllLabels(ImmutableMap.of(
"videocloud/account", request.getAccount(),
"videocloud/protocol", request.getProtocol(),
"videocloud/dimensions", dimensionStr,
"bigtable/rowCount", String.valueOf(count.get()),
"bigtable/responseCount", String.valueOf(response.getRowsCount())));
traceSpanList.add(traceSpan.build());
response.setStatus(APIResponse.Status.OK)
.setDetails(String.format("Processed %d results from BigTable", count.get()));
} finally {
parentSpan.setEndTime(getCurrentTime());
traceSpanList.add(parentSpan.build());
}
return traceSpanList;
}
}