1

我们正在开发一个订阅 Google pub-sub 主题的 Spring Boot 应用程序。下面是我的代码 -

pom.xml 具有 GCP-Spring Boot 相关依赖项

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-gcp-dependencies</artifactId>
                <version>1.2.5.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

应用程序.yml 文件

spring:
  cloud:
    gcp:
      project-id: project-dev
      credentials:
        location: file:C:/Users/project-dev.json
gcp:
  pubsub:
    subscription: dev-topic-sub

定义bean的配置类

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.AckMode;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@Configuration
public class GCPPubSubConfig {

    @Value("${gcp.pubsub.subscription}")
    private String subscriptionId;

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("gcpChannel") MessageChannel inputChannel, PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, subscriptionId);
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.AUTO);
        adapter.setPayloadType(String.class);
        return adapter;
    }

    @Bean
    public MessageChannel gcpChannel() {
        return new DirectChannel();
    }
}

这是我的听众课

@Slf4j
@Component
public class GCPPubSubSubscriber {

    @ServiceActivator(inputChannel = "gcpChannel")
    public void messageReceiver(String payload) {

        log.info(payload);
    }
}

上面的代码在我的本地系统中对我来说很好。我能够启动我的应用程序、订阅频道、收听我的 ServiceActivator 方法中的消息并能够打印它们。

当我将它们部署到 Azure 云时,能够订阅频道,启动应用程序,但在收听消息时,出现以下异常

08:10:42.213  INFO [,,,] 1 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8085 (http) with context path ''
 08:10:42.233  INFO [,,,] 1 --- [           main] .a.e.t.d.t.b.Application : Started Application in 49.345 seconds (JVM running for 52.997)
 08:10:43.098 ERROR [,,,] 1 --- [bscriber-SE-1-4] c.g.c.p.v.StreamingSubscriberConnection  : terminated streaming with exception

com.google.api.gax.rpc.UnknownException: com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline: [WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:47) ~[gax-1.49.1.jar!/:1.49.1]
        at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$1.onFailure(StreamingSubscriberConnection.java:238) ~[google-cloud-pubsub-1.101.0.jar!/:1.101.0]
        at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) [api-common-1.8.1.jar!/:na]
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1083) [guava-29.0-jre.jar!/:na]
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) [guava-29.0-jre.jar!/:na]
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1174) [guava-29.0-jre.jar!/:na]
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969) [guava-29.0-jre.jar!/:na]
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760) [guava-29.0-jre.jar!/:na]
        at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95) [api-common-1.8.1.jar!/:na]
        at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77) [api-common-1.8.1.jar!/:na]
        at com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:52) [api-common-1.8.1.jar!/:na]
        at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$StreamingPullResponseObserver.onError(StreamingSubscriberConnection.java:174) [google-cloud-pubsub-1.101.0.jar!/:1.101.0]
        at com.google.api.gax.tracing.TracedResponseObserver.onError(TracedResponseObserver.java:103) [gax-1.49.1.jar!/:1.49.1]
        at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:84) [gax-grpc-1.49.1.jar!/:1.49.1]
        at com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:86) [gax-1.49.1.jar!/:1.49.1]
        at com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:149) [gax-grpc-1.49.1.jar!/:1.49.1]
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426) [grpc-core-1.30.2.jar!/:1.30.2]
        at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66) [grpc-core-1.30.2.jar!/:1.30.2]
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689) [grpc-core-1.30.2.jar!/:1.30.2]
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577) [grpc-core-1.30.2.jar!/:1.30.2]
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751) [grpc-core-1.30.2.jar!/:1.30.2]
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740) [grpc-core-1.30.2.jar!/:1.30.2]
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.30.2.jar!/:1.30.2]
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) [grpc-core-1.30.2.jar!/:1.30.2]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_275]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_275]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_275]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_275]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_275]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_275]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_275]
Caused by: com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline: [WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:47) ~[gax-1.49.1.jar!/:1.49.1]
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) ~[gax-grpc-1.49.1.jar!/:1.49.1]
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) ~[gax-grpc-1.49.1.jar!/:1.49.1]
        at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:82) [gax-grpc-1.49.1.jar!/:1.49.1]
        ... 17 common frames omitted
Caused by: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline: [WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
        at io.grpc.Status.asRuntimeException(Status.java:533) ~[grpc-api-1.30.2.jar!/:1.30.2]
        ... 16 common frames omitted
Caused by: io.grpc.netty.shaded.io.netty.channel.ChannelPipelineException: io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators$ClientTlsHandler.handlerAdded() has thrown an exception; removed.
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:624) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.replace(DefaultChannelPipeline.java:572) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.replace(DefaultChannelPipeline.java:515) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators$ProtocolNegotiationHandler.fireProtocolNegotiationEvent(ProtocolNegotiators.java:767) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators$WaitUntilActiveHandler.channelActive(ProtocolNegotiators.java:676) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.fulfillConnectPromise(AbstractEpollChannel.java:620) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:653) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        ... 1 common frames omitted
Caused by: java.lang.RuntimeException: ALPN unsupported. Is your classpath configured correctly? For Conscrypt, add the appropriate Conscrypt JAR to classpath and set the security provider. For Jetty-ALPN, see http://www.eclipse.org/jetty/documentation/current/alpn-chapter.html#alpn-starting
        at io.grpc.netty.shaded.io.netty.handler.ssl.JdkAlpnApplicationProtocolNegotiator$FailureWrapper.wrapSslEngine(JdkAlpnApplicationProtocolNegotiator.java:122) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.handler.ssl.JdkSslContext.configureAndWrapEngine(JdkSslContext.java:360) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.handler.ssl.JdkSslContext.newEngine(JdkSslContext.java:335) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators$ClientTlsHandler.handlerAdded0(ProtocolNegotiators.java:348) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators$ProtocolNegotiationHandler.handlerAdded(ProtocolNegotiators.java:726) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.callHandlerAdded(AbstractChannelHandlerContext.java:971) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:609) ~[grpc-netty-shaded-1.30.2.jar!/:1.30.2]
        ... 20 common frames omitted

根据异常,我怀疑 Java 版本可能存在问题,因此检查了详细信息

我本地系统中的 Java 版本详细信息 -

Desktop>java -version
openjdk version "1.8.0_265"
OpenJDK Runtime Environment (build 1.8.0_265-b01)
OpenJDK 64-Bit Server VM (build 25.265-b01, mixed mode)

Azure 中的 Java 版本详细信息 -

/home/aksuser $ java -version
openjdk version "1.8.0_275"
OpenJDK Runtime Environment (IcedTea 3.17.1) (Alpine 8.275.01-r0)
OpenJDK 64-Bit Server VM (build 25.275-b01, mixed mode)
/home/aksuser $

有没有可能是Java版本的问题?因为它在本地运行良好,但在 Azure 中运行良好。为什么 ALPN 在使用 GCP 时会出现?如果问题出在不支持 ALPN 的 Java 版本上,那么解决该错误的可能性是什么?

4

2 回答 2

1

gRPC 使用 Netty 进行 SSL。

根据 gRPC 的官方文档:

https://github.com/grpc/grpc-java/blob/master/SECURITY.md#netty

传输安全 (TLS)

HTTP/2 over TLS 要求使用 ALPN 来协商 h2 协议的使用并支持 AES 的 GCM 模式。

它们都是 Java 8,但似乎 Alpine 版本不支持 ALPN。这就是错误所说的。

似乎 ALPN 支持已向后移植到 Java 8u252 和 Java 9。

您可以执行以下操作,也可以将 Azure Java 映像更改为支持 ALPN 的版本。

间接使用 ALPN 的应用程序更改

使用 Java 8 并依赖于提供 ALPN 支持的库(例如上面描述的 jetty-alpn-openjdk8-[client|server] 工件)的应用程序必须修改它们的启动方式。

对于仍在使用 8u252 之前的 OpenJDK 版本的应用程序,典型的命令行需要 bootclasspath 中的 alpn-boot jar 和使用 Jetty ALPN API 的库(此处以 jetty-alpn-openjdk8-server 为例)在类路径中:

/opt/openjdk-8u242/bin/java -Xbootclasspath/p:/path/to/alpn-boot-8.1.13.v20181017.jar -classpath jetty-alpn-openjdk8-server-9.4.27.v20200227:...

对于想要使用 OpenJDK 8u252 或更高版本的同一应用程序,命令行变为:

/opt/openjdk-8u252/bin/java -classpath jetty-alpn-openjdk8-server-9.4.28.v20200408:...

也就是说,必须删除 -Xbootclasspath 选项,并且必须将库升级到支持向后移植的 OpenJDK ALPN API 的版本。

有关此的更多信息:https ://webtide.com/jetty-alpn-java-8u252/

于 2021-11-03T09:59:54.457 回答
0

理想情况下,需要安装支持 ALPN 的 Java 版本才能完成这项工作。由于您已将应用程序部署在基于容器的环境中,请尝试使用支持 ALPN 的 JDK 映像单独自定义您的应用程序,以便环境中的其他应用程序不会中断。

您可以从此链接检查您的 java 版本是否与 GCP 兼容 -

https://github.com/googleapis/google-cloud-java/tree/main/google-cloud-util/google-cloud-compat-checker

于 2021-11-16T13:51:36.830 回答