0

我们正在构建一个 Graylog 输出插件来将数据发送到 Google PubSub。这是我们编写的代码,灵感来自 google pubsub 提供的样板代码(thisthis

try (InputStream credential = new FileInputStream(Objects.requireNonNull(config.getString(CK_CREDENTIAL_FILE)))) {
                CredentialsProvider credentialsProvider = FixedCredentialsProvider
                        .create(ServiceAccountCredentials.fromStream(credential));
                // endpoint can be set here
                publisher = Publisher.newBuilder(topicName).setCredentialsProvider(credentialsProvider).build();

                ByteString finalData = ByteString.copyFromUtf8(String.valueOf(obj));
                PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                        .setData(finalData)
                        .build();
                ApiFuture<String> future = publisher.publish(pubsubMessage);
                messageIdFutures.add(future);
                ApiFutures.addCallback(
                        future,
                        new ApiFutureCallback<String>() {

                            @Override
                            public void onFailure(Throwable throwable) {
                                if (throwable instanceof ApiException) {
                                    ApiException apiException = ((ApiException) throwable);
                                    // details on the API exception
                                    System.out.println(apiException.getStatusCode().getCode());
                                    System.out.println(apiException.isRetryable());
                                }
                                System.out.println("Error publishing message : " + String.valueOf(obj));
                            }

                            @Override
                            public void onSuccess(String messageId) {
                                // Once published, returns server-assigned message ids (unique within the topic)
                                System.out.println(messageId);
                            }
                        },
                        MoreExecutors.directExecutor());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (publisher != null) {
                try {
                    try {
                        publisher.shutdown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    publisher.awaitTermination(1, TimeUnit.MINUTES);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

运行时,我们得到以下错误堆栈:-

java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: tried to access field io.opencensus.trace.unsafe.
ContextUtils.CONTEXT_SPAN_KEY from class io.grpc.internal.CensusTracingModule$TracingClientInterceptor
        at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:526)
        at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:423)
        at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:90)
        at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:68)
        at org.plugin.PubSubOutput.writeBuffer(PubSubOutput.java:159)
        at org.plugin.PubSubOutput.write(PubSubOutput.java:85)
        at org.graylog2.buffers.processors.OutputBufferProcessor$1.run(OutputBufferProcessor.java:191)
        at com.codahale.metrics.InstrumentedExecutorService$InstrumentedRunnable.run(InstrumentedExecutorService.java:18
1)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)


 ERROR   [AggregateFuture] - Input Future failed with Error - {}
java.lang.IllegalAccessError: tried to access field io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY from class
io.grpc.internal.CensusTracingModule$TracingClientInterceptor
        at io.grpc.internal.CensusTracingModule$TracingClientInterceptor.interceptCall(CensusTracingModule.java:384) ~[g
raylog-plugin-pubsub-output-1.0.0-SNAPSHOT.jar:?]
        at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156) ~[graylog-plugin-pubsub-ou
tput-1.0.0-SNAPSHOT.jar:?]
        at io.grpc.internal.CensusStatsModule$StatsClientInterceptor.interceptCall(CensusStatsModule.java:685) ~[graylog
-plugin-pubsub-output-1.0.0-SNAPSHOT.jar:?]
        at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156) ~[graylog-plugin-pubsub-ou
tput-1.0.0-SNAPSHOT.jar:?]
        at com.google.api.gax.grpc.GrpcHeaderInterceptor.interceptCall(GrpcHeaderInterceptor.java:81) ~[graylog-plugin-p
ubsub-output-1.0.0-SNAPSHOT.jar:?]
        at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156) ~[graylog-plugin-pubsub-ou
tput-1.0.0-SNAPSHOT.jar:?]
        at com.google.api.gax.grpc.GrpcMetadataHandlerInterceptor.interceptCall(GrpcMetadataHandlerInterceptor.java:55)
~[graylog-plugin-pubsub-output-1.0.0-SNAPSHOT.jar:?]
        at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156) ~[graylog-plugin-pubsub-ou
tput-1.0.0-SNAPSHOT.jar:?]
        at io.grpc.internal.ManagedChannelImpl.newCall(ManagedChannelImpl.java:766) ~[graylog-plugin-pubsub-output-1.0.0
-SNAPSHOT.jar:?]
        at io.grpc.internal.ForwardingManagedChannel.newCall(ForwardingManagedChannel.java:63) ~[graylog-plugin-pubsub-o
utput-1.0.0-SNAPSHOT.jar:?]
        at com.google.api.gax.grpc.ChannelPool.newCall(ChannelPool.java:77) ~[graylog-plugin-pubsub-output-1.0.0-SNAPSHO
T.jar:?]
        at com.google.api.gax.grpc.GrpcClientCalls.newCall(GrpcClientCalls.java:88) ~[graylog-plugin-pubsub-output-1.0.0
-SNAPSHOT.jar:?]
        at com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58) ~[graylog-plugin-pubsub-out
put-1.0.0-SNAPSHOT.jar:?]
        at com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64) ~[graylog-plugin-pubs
ub-output-1.0.0-SNAPSHOT.jar:?]
        at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:81) ~[graylog-plugin-pubsub-output-1.0.0-SNA
PSHOT.jar:?]
        at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63) ~[graylog-plugin-pubsub-output-1
.0.0-SNAPSHOT.jar:?]
        at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41) ~[graylog-plugin-pubsub-output-1
.0.0-SNAPSHOT.jar:?]
        at com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79) ~[graylog-plugin-pubsub-output-1
.0.0-SNAPSHOT.jar:?]
        at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126) ~[graylog-plugin-pubsub-output-1.0.
0-SNAPSHOT.jar:?]
        at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87) ~[graylog-plugin-pubsub-output-1.0.0-S
NAPSHOT.jar:?]
        at com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:317) [graylog-plugin-pubsub-outpu
t-1.0.0-SNAPSHOT.jar:?]
        at com.google.cloud.pubsub.v1.Publisher.publishAllOutstanding(Publisher.java:306) [graylog-plugin-pubsub-output-
1.0.0-SNAPSHOT.jar:?]
        at com.google.cloud.pubsub.v1.Publisher$3.run(Publisher.java:280) [graylog-plugin-pubsub-output-1.0.0-SNAPSHOT.j
ar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_222]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_222]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.j
ava:180) [?:1.8.0_222]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293
) [?:1.8.0_222]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
Caused by: java.lang.IllegalAccessError: tried to access field io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY
from class io.grpc.internal.CensusTracingModule$TracingClientInterceptor
        at io.grpc.internal.CensusTracingModule$TracingClientInterceptor.interceptCall(CensusTracingModule.java:384)
        at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
        at io.grpc.internal.CensusStatsModule$StatsClientInterceptor.interceptCall(CensusStatsModule.java:685)
        at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
        at com.google.api.gax.grpc.GrpcHeaderInterceptor.interceptCall(GrpcHeaderInterceptor.java:81)
        at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
        at com.google.api.gax.grpc.GrpcMetadataHandlerInterceptor.interceptCall(GrpcMetadataHandlerInterceptor.java:55)
        at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
        at io.grpc.internal.ManagedChannelImpl.newCall(ManagedChannelImpl.java:766)
        at io.grpc.internal.ForwardingManagedChannel.newCall(ForwardingManagedChannel.java:63)
        at com.google.api.gax.grpc.ChannelPool.newCall(ChannelPool.java:77)
        at com.google.api.gax.grpc.GrpcClientCalls.newCall(GrpcClientCalls.java:88)
        at com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
        at com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
        at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:81)
        at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
        at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
        at com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
        at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
        at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
        at com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:317)
        at com.google.cloud.pubsub.v1.Publisher.publishAllOutstanding(Publisher.java:306)
        at com.google.cloud.pubsub.v1.Publisher$3.run(Publisher.java:280)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.j
ava:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293
)

我们甚至查看了grpc-java github repo,但似乎无法找出问题所在。我们没有在通信中使用 grpc,但我们认为 google pubsub java 模块在内部使用了它。

任何帮助将不胜感激。提前致谢。

4

1 回答 1

0

我遇到了同样的问题,我的应用程序是使用 scala 开发的(使用 sbt),但与 maven 相同。为了解决这个问题,我改变了我的依赖:

libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.75.0"
libraryDependencies += "com.google.api-client" % "google-api-client" % "1.30.5"
libraryDependencies += "com.google.cloud" % "google-cloud-core-grpc" % "1.91.3"

对于代码,只需保留创建谷歌凭据对象的默认方式:

def publishMessages(projectId : String, topic : String, event : String): Unit = { 
    val topicName = ProjectTopicName.of(projectId, topic)
    var publisher : Publisher = null
    val messageIdFutures = new util.ArrayList[ApiFuture[String]]
    try {
      val credentials = GoogleCredentials.fromStream(new FileInputStream(creds)).createScoped(
        java.util.Arrays.asList("https://www.googleapis.com/auth/cloud-platform")
      )
      publisher = Publisher.newBuilder(topicName)
        .setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build

      val payload = ByteString.copyFromUtf8(event)
      val pubsubMessage = PubsubMessage.newBuilder.setData(payload).build
      val messageIdFuture = publisher.publish(pubsubMessage)
      messageIdFutures.add(messageIdFuture)
}
于 2019-11-25T16:20:12.950 回答