2

我正在尝试订阅一个 Apache Pulsar 主题,并且我确定租户、命名空间和主题都存在,因为其他消费者能够订阅这个完全相同的主题。我已经仔细检查了我的 URL 和端口是否正确,并且我使用了正确的信任证书。但是,我得到了异常Error Checking/Getting Partition Metadata while Subscribing,它不会连接。我正在尝试通过 TLS 连接 SSL。

我的 Python 代码看起来像这样,但正如我在下面提到的,我在 Java 和 Go 中使用了相同的方法,并且在每种语言中都得到了相同的结果

import pulsar
client = pulsar.Client('pulsar+ssl://my-pulsar-ms-tls.mydomain.com:6651',
                       tls_trust_certs_file_path='./ca.cert.pem')
consumer = client.subscribe(topic='persistent://my-tenant/ingest/my-topic-name',
                            subscription_name='my-topic-name-source')

我得到的例外情况详述如下:

2020-06-05 19:55:05.347 INFO  Client:88 | Subscribing on Topic :persistent://my-tenant/ingest/my-topic-name  
2020-06-05 19:55:05.348 INFO  ConnectionPool:72 | Created connection for pulsar+ssl://my-pulsar-ms-tls.mydomain.com:6651  
2020-06-05 19:55:05.540 INFO  ClientConnection:324 | [10.90.3.86:59058 ->
10.20.13.58:6651] Connected to broker  
2020-06-05 19:55:05.582 ERROR ClientConnection:382 | [10.90.3.86:59058 -> 10.20.13.58:6651] Handshake failed: Connection reset by peer  
2020-06-05 19:55:05.582 INFO  ClientConnection:1337 | [10.90.3.86:59058 -> 10.20.13.58:6651] Connection closed  
2020-06-05 19:55:05.582 ERROR ClientImpl:384 | Error Checking/Getting Partition Metadata while Subscribing on persistent://my-tenant/ingest/my-topic-name -- 5  
2020-06-05 19:55:05.582 INFO  ClientConnection:229 | [10.90.3.86:59058 ->
10.20.13.58:6651] Destroyed connection Traceback (most recent call last):   File "/Users/exampleUser/PycharmProjects/ledger/test.py", line 7, in <module>
    subscription_name='my-topic-name-source')   File "/Users/exampleUser/PycharmProjects/ledger/venv/lib/python3.7/site-packages/pulsar/__init__.py", line 604, in subscribe
    c._consumer = self._client.subscribe(topic, subscription_name, conf) Exception: Pulsar error: ConnectError

我在使用 Java 和 Go 时遇到了同样的异常,所以它不是特定于 Python 的。

问题是什么?

4

1 回答 1

2

问题是本主题希望您提供身份验证令牌,但您没有提供。

您需要做的就是修改您的 pulsar Client 构造函数以传递身份验证令牌,如下所示:

// trust_cert_path could be "ca.cert.pem", if that file is accessible, for example.
client = pulsar.Client(pulsarBrokerURL,
                         authentication=pulsar.AuthenticationToken(pulsarBrokerToken), 
                         tls_trust_certs_file_path=trust_cert_path)

在 Go 中,它看起来像这样:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL:                     pulsarBrokerURL,
    OperationTimeoutSeconds: 5,
    MessageListenerThreads:  runtime.NumCPU(),
    TLSTrustCertsFilePath:   trust_cert_path,
    Authentication:          pulsar.NewAuthenticationToken(pulsarBrokerToken),
})

其中 pulsarBrokerToken 只是一个字符串。

在 Java 中,它看起来像这样:

// client is already defined
client = PulsarClient.builder()
        .serviceUrl(pulsarBrokerUrl)
        .tlsTrustCertsFilePath(trust_cert_path)
        .authentication(
                AuthenticationFactory.token(pulsarBrokerToken) )
        .build();

如果您不知道如何获取身份验证令牌(如果您不管理 Pulsar 集群),请询问维护您的 Pulsar 集群的人员以提供此身份验证令牌。

于 2020-06-06T04:28:06.057 回答