我们正在构建一个 Graylog 输出插件来将数据发送到 Google PubSub。这是我们编写的代码,灵感来自 google pubsub 提供的样板代码(this和this)
try (InputStream credential = new FileInputStream(Objects.requireNonNull(config.getString(CK_CREDENTIAL_FILE)))) {
CredentialsProvider credentialsProvider = FixedCredentialsProvider
.create(ServiceAccountCredentials.fromStream(credential));
// endpoint can be set here
publisher = Publisher.newBuilder(topicName).setCredentialsProvider(credentialsProvider).build();
ByteString finalData = ByteString.copyFromUtf8(String.valueOf(obj));
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(finalData)
.build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
messageIdFutures.add(future);
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// details on the API exception
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + String.valueOf(obj));
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic)
System.out.println(messageId);
}
},
MoreExecutors.directExecutor());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (publisher != null) {
try {
try {
publisher.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
publisher.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
运行时,我们得到以下错误堆栈:-
java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: tried to access field io.opencensus.trace.unsafe.
ContextUtils.CONTEXT_SPAN_KEY from class io.grpc.internal.CensusTracingModule$TracingClientInterceptor
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:526)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:423)
at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:90)
at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:68)
at org.plugin.PubSubOutput.writeBuffer(PubSubOutput.java:159)
at org.plugin.PubSubOutput.write(PubSubOutput.java:85)
at org.graylog2.buffers.processors.OutputBufferProcessor$1.run(OutputBufferProcessor.java:191)
at com.codahale.metrics.InstrumentedExecutorService$InstrumentedRunnable.run(InstrumentedExecutorService.java:18
1)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
ERROR [AggregateFuture] - Input Future failed with Error - {}
java.lang.IllegalAccessError: tried to access field io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY from class
io.grpc.internal.CensusTracingModule$TracingClientInterceptor
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor.interceptCall(CensusTracingModule.java:384) ~[g
raylog-plugin-pubsub-output-1.0.0-SNAPSHOT.jar:?]
at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156) ~[graylog-plugin-pubsub-ou
tput-1.0.0-SNAPSHOT.jar:?]
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor.interceptCall(CensusStatsModule.java:685) ~[graylog
-plugin-pubsub-output-1.0.0-SNAPSHOT.jar:?]
at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156) ~[graylog-plugin-pubsub-ou
tput-1.0.0-SNAPSHOT.jar:?]
at com.google.api.gax.grpc.GrpcHeaderInterceptor.interceptCall(GrpcHeaderInterceptor.java:81) ~[graylog-plugin-p
ubsub-output-1.0.0-SNAPSHOT.jar:?]
at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156) ~[graylog-plugin-pubsub-ou
tput-1.0.0-SNAPSHOT.jar:?]
at com.google.api.gax.grpc.GrpcMetadataHandlerInterceptor.interceptCall(GrpcMetadataHandlerInterceptor.java:55)
~[graylog-plugin-pubsub-output-1.0.0-SNAPSHOT.jar:?]
at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156) ~[graylog-plugin-pubsub-ou
tput-1.0.0-SNAPSHOT.jar:?]
at io.grpc.internal.ManagedChannelImpl.newCall(ManagedChannelImpl.java:766) ~[graylog-plugin-pubsub-output-1.0.0
-SNAPSHOT.jar:?]
at io.grpc.internal.ForwardingManagedChannel.newCall(ForwardingManagedChannel.java:63) ~[graylog-plugin-pubsub-o
utput-1.0.0-SNAPSHOT.jar:?]
at com.google.api.gax.grpc.ChannelPool.newCall(ChannelPool.java:77) ~[graylog-plugin-pubsub-output-1.0.0-SNAPSHO
T.jar:?]
at com.google.api.gax.grpc.GrpcClientCalls.newCall(GrpcClientCalls.java:88) ~[graylog-plugin-pubsub-output-1.0.0
-SNAPSHOT.jar:?]
at com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58) ~[graylog-plugin-pubsub-out
put-1.0.0-SNAPSHOT.jar:?]
at com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64) ~[graylog-plugin-pubs
ub-output-1.0.0-SNAPSHOT.jar:?]
at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:81) ~[graylog-plugin-pubsub-output-1.0.0-SNA
PSHOT.jar:?]
at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63) ~[graylog-plugin-pubsub-output-1
.0.0-SNAPSHOT.jar:?]
at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41) ~[graylog-plugin-pubsub-output-1
.0.0-SNAPSHOT.jar:?]
at com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79) ~[graylog-plugin-pubsub-output-1
.0.0-SNAPSHOT.jar:?]
at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126) ~[graylog-plugin-pubsub-output-1.0.
0-SNAPSHOT.jar:?]
at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87) ~[graylog-plugin-pubsub-output-1.0.0-S
NAPSHOT.jar:?]
at com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:317) [graylog-plugin-pubsub-outpu
t-1.0.0-SNAPSHOT.jar:?]
at com.google.cloud.pubsub.v1.Publisher.publishAllOutstanding(Publisher.java:306) [graylog-plugin-pubsub-output-
1.0.0-SNAPSHOT.jar:?]
at com.google.cloud.pubsub.v1.Publisher$3.run(Publisher.java:280) [graylog-plugin-pubsub-output-1.0.0-SNAPSHOT.j
ar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_222]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_222]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.j
ava:180) [?:1.8.0_222]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293
) [?:1.8.0_222]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
Caused by: java.lang.IllegalAccessError: tried to access field io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY
from class io.grpc.internal.CensusTracingModule$TracingClientInterceptor
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor.interceptCall(CensusTracingModule.java:384)
at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor.interceptCall(CensusStatsModule.java:685)
at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
at com.google.api.gax.grpc.GrpcHeaderInterceptor.interceptCall(GrpcHeaderInterceptor.java:81)
at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
at com.google.api.gax.grpc.GrpcMetadataHandlerInterceptor.interceptCall(GrpcMetadataHandlerInterceptor.java:55)
at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
at io.grpc.internal.ManagedChannelImpl.newCall(ManagedChannelImpl.java:766)
at io.grpc.internal.ForwardingManagedChannel.newCall(ForwardingManagedChannel.java:63)
at com.google.api.gax.grpc.ChannelPool.newCall(ChannelPool.java:77)
at com.google.api.gax.grpc.GrpcClientCalls.newCall(GrpcClientCalls.java:88)
at com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
at com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:81)
at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
at com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
at com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:317)
at com.google.cloud.pubsub.v1.Publisher.publishAllOutstanding(Publisher.java:306)
at com.google.cloud.pubsub.v1.Publisher$3.run(Publisher.java:280)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.j
ava:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293
)
我们甚至查看了grpc-java github repo,但似乎无法找出问题所在。我们没有在通信中使用 grpc,但我们认为 google pubsub java 模块在内部使用了它。
任何帮助将不胜感激。提前致谢。