我们正在开发一个订阅 Google pub-sub 主题的 Spring Boot 应用程序。下面是我的代码 -
pom.xml 具有 GCP-Spring Boot 相关依赖项
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-dependencies</artifactId>
<version>1.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
应用程序.yml 文件
spring:
cloud:
gcp:
project-id: project-dev
credentials:
location: file:C:/Users/project-dev.json
gcp:
pubsub:
subscription: dev-topic-sub
定义bean的配置类
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.AckMode;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
@Configuration
public class GCPPubSubConfig {
@Value("${gcp.pubsub.subscription}")
private String subscriptionId;
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("gcpChannel") MessageChannel inputChannel, PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, subscriptionId);
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.AUTO);
adapter.setPayloadType(String.class);
return adapter;
}
@Bean
public MessageChannel gcpChannel() {
return new DirectChannel();
}
}
这是我的听众课
@Slf4j
@Component
public class GCPPubSubSubscriber {
@ServiceActivator(inputChannel = "gcpChannel")
public void messageReceiver(String payload) {
log.info(payload);
}
}
上面的代码在我的本地系统中对我来说很好。我能够启动我的应用程序、订阅频道、收听我的 ServiceActivator 方法中的消息并能够打印它们。
当我将它们部署到 Azure 云时,能够订阅频道,启动应用程序,但在收听消息时,出现以下异常
08:10:42.213 INFO [,,,] 1 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8085 (http) with context path ''
08:10:42.233 INFO [,,,] 1 --- [ main] .a.e.t.d.t.b.Application : Started Application in 49.345 seconds (JVM running for 52.997)
08:10:43.098 ERROR [,,,] 1 --- [bscriber-SE-1-4] c.g.c.p.v.StreamingSubscriberConnection : terminated streaming with exception
com.google.api.gax.rpc.UnknownException: com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline: [WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:47) ~[gax-1.49.1.jar!/:1.49.1]
at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$1.onFailure(StreamingSubscriberConnection.java:238) ~[google-cloud-pubsub-1.101.0.jar!/:1.101.0]
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) [api-common-1.8.1.jar!/:na]
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1083) [guava-29.0-jre.jar!/:na]
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) [guava-29.0-jre.jar!/:na]
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1174) [guava-29.0-jre.jar!/:na]
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969) [guava-29.0-jre.jar!/:na]
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760) [guava-29.0-jre.jar!/:na]
at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95) [api-common-1.8.1.jar!/:na]
at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77) [api-common-1.8.1.jar!/:na]
at com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:52) [api-common-1.8.1.jar!/:na]
at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$StreamingPullResponseObserver.onError(StreamingSubscriberConnection.java:174) [google-cloud-pubsub-1.101.0.jar!/:1.101.0]
at com.google.api.gax.tracing.TracedResponseObserver.onError(TracedResponseObserver.java:103) [gax-1.49.1.jar!/:1.49.1]
at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:84) [gax-grpc-1.49.1.jar!/:1.49.1]
at com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:86) [gax-1.49.1.jar!/:1.49.1]
at com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:149) [gax-grpc-1.49.1.jar!/:1.49.1]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426) [grpc-core-1.30.2.jar!/:1.30.2]
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66) [grpc-core-1.30.2.jar!/:1.30.2]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689) [grpc-core-1.30.2.jar!/:1.30.2]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577) [grpc-core-1.30.2.jar!/:1.30.2]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751) [grpc-core-1.30.2.jar!/:1.30.2]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740) [grpc-core-1.30.2.jar!/:1.30.2]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.30.2.jar!/:1.30.2]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) [grpc-core-1.30.2.jar!/:1.30.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_275]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_275]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_275]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_275]
Caused by: com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline: [WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:47) ~[gax-1.49.1.jar!/:1.49.1]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) ~[gax-grpc-1.49.1.jar!/:1.49.1]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) ~[gax-grpc-1.49.1.jar!/:1.49.1]
at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:82) [gax-grpc-1.49.1.jar!/:1.49.1]
... 17 common frames omitted
Caused by: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline: [WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
at io.grpc.Status.asRuntimeException(Status.java:533) ~[grpc-api-1.30.2.jar!/:1.30.2]
... 16 common frames omitted
Caused by: io.grpc.netty.shaded.io.netty.channel.ChannelPipelineException: io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators$ClientTlsHandler.handlerAdded() has thrown an exception; removed.
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:624) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.replace(DefaultChannelPipeline.java:572) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.replace(DefaultChannelPipeline.java:515) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators$ProtocolNegotiationHandler.fireProtocolNegotiationEvent(ProtocolNegotiators.java:767) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators$WaitUntilActiveHandler.channelActive(ProtocolNegotiators.java:676) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.fulfillConnectPromise(AbstractEpollChannel.java:620) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:653) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
... 1 common frames omitted
Caused by: java.lang.RuntimeException: ALPN unsupported. Is your classpath configured correctly? For Conscrypt, add the appropriate Conscrypt JAR to classpath and set the security provider. For Jetty-ALPN, see http://www.eclipse.org/jetty/documentation/current/alpn-chapter.html#alpn-starting
at io.grpc.netty.shaded.io.netty.handler.ssl.JdkAlpnApplicationProtocolNegotiator$FailureWrapper.wrapSslEngine(JdkAlpnApplicationProtocolNegotiator.java:122) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.handler.ssl.JdkSslContext.configureAndWrapEngine(JdkSslContext.java:360) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.handler.ssl.JdkSslContext.newEngine(JdkSslContext.java:335) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators$ClientTlsHandler.handlerAdded0(ProtocolNegotiators.java:348) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators$ProtocolNegotiationHandler.handlerAdded(ProtocolNegotiators.java:726) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.callHandlerAdded(AbstractChannelHandlerContext.java:971) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:609) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
... 20 common frames omitted
根据异常,我怀疑 Java 版本可能存在问题,因此检查了详细信息
我本地系统中的 Java 版本详细信息 -
Desktop>java -version
openjdk version "1.8.0_265"
OpenJDK Runtime Environment (build 1.8.0_265-b01)
OpenJDK 64-Bit Server VM (build 25.265-b01, mixed mode)
Azure 中的 Java 版本详细信息 -
/home/aksuser $ java -version
openjdk version "1.8.0_275"
OpenJDK Runtime Environment (IcedTea 3.17.1) (Alpine 8.275.01-r0)
OpenJDK 64-Bit Server VM (build 25.275-b01, mixed mode)
/home/aksuser $
有没有可能是Java版本的问题?因为它在本地运行良好,但在 Azure 中运行良好。为什么 ALPN 在使用 GCP 时会出现?如果问题出在不支持 ALPN 的 Java 版本上,那么解决该错误的可能性是什么?