3

我有一个可以工作的python kafka,它是代码:

class TokenProvider(object):
    
    def __init__(self,client_id,client_secret):
        self.client_id = client_id
        self.client_secret = client_secret
    def token(self):
        token_url = 'https://test.com/protocol/openid-connect/token'
        client = BackendApplicationClient(client_id=self.client_id)
        oauth = OAuth2Session(client=client)
        token_json = oauth.fetch_token(token_url=token_url, client_id=self.client_id, client_secret=self.client_secret)
        token = token_json['access_token']
        #print(token)
        return token

consumer = KafkaConsumer(
    group_id=None,
    bootstrap_servers=['test.com:9094'],
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=TokenProvider(client_id,client_secret),
    ssl_check_hostname=False,
    ssl_context=create_ssl_context(),
    auto_offset_reset=offset,
    enable_auto_commit=False,
    value_deserializer=lambda m: decode(m)
    )
consumer.subscribe(topics=['test.stream'])

我的融合 python 如下,我收到此错误

cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Property "oauthbearer_token_refresh_cb" must be set through dedicated .._set_..() function"}

c = Consumer({
    'bootstrap.servers': 'test.com:9094',
    'sasl.mechanism': 'OAUTHBEARER',
    'security.protocol': 'SASL_SSL',
    'oauthbearer_token_refresh_cb': TokenProvider(client_id,client_secret),
    'group.id': str(uuid.uuid1()),
    'auto.offset.reset': 'earliest'
})

c.subscribe(['test.stream']) 

那么如何让融合的 kafka 工作呢?我似乎对使用 OAUTHBEARER 和 SASL_SSL 的 oauthbearer_token_refresh_cb 有问题。

本质上,我使用 jwt 令牌进行身份验证

4

2 回答 2

0

从源码看,python 和 go 客户端还不支持 oauthbearer

于 2020-11-22T09:38:45.727 回答
0

根据https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md上的文档,该oauthbearer_token_refresh_cb选项必须使用rd_kafka_conf_set_oauthbearer_token_refresh_cb(). 但是请注意,您正在尝试将其设置TokenProvider为不可调用的实例,因此您可能希望通过TokenProvider(...).token.

SASL/OAUTHBEARER 令牌刷新回调(使用 rd_kafka_conf_set_oauthbearer_token_refresh_cb() 设置,由 rd_kafka_poll() 等触发。该回调将在客户端刷新客户端的 OAUTHBEARER 令牌时触发。

于 2020-11-19T16:19:55.030 回答