1

kafka-python客户端支持 Kafka 0.9 ,但显然不包含新的身份验证和加密功能,所以我的猜测是它只适用于开放服务器(与以前的版本一样)。在任何情况下,即使是 Java 客户端也需要一个特殊的消息中心登录模块来连接(或者从示例中可以看出),这表明除非有可用于 Python 的类似模块,否则什么都不会起作用。

我的具体场景是,我想使用同样托管在 Bluemix 中的 Jupyter 笔记本中的消息中心服务(Apache Spark 服务)。

4

4 回答 4

3

我能够使用 kafka-python 库进行连接:

$ pip install --user kafka-python

然后 ...

from kafka import KafkaProducer
from kafka.errors import KafkaError
import ssl

############################################
# Service credentials from Bluemix UI:
############################################
bootstrap_servers =   # kafka_brokers_sasl
sasl_plain_username = # user
sasl_plain_password = # password
############################################

sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'

# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1

producer = KafkaProducer(bootstrap_servers = bootstrap_servers,
                         sasl_plain_username = sasl_plain_username,
                         sasl_plain_password = sasl_plain_password,
                         security_protocol = security_protocol,
                         ssl_context = context,
                         sasl_mechanism = sasl_mechanism,
                         api_version=(0,10))

# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

这对我来说是 Bluemix spark 作为 jupyter notebook 中的一项服务,但是请注意,这种方法不使用 spark。该代码只是在驱动程序主机上运行。

于 2016-10-30T01:13:28.653 回答
1

已请求 Kafka Python 客户端中的 SASL 支持:https ://github.com/dpkp/kafka-python/issues/533但在支持 Message Hub 使用的用户名/密码登录方法之前,它将不起作用

于 2016-02-09T16:06:16.880 回答
0

在 Bluemix Apache Spark 服务原生支持这一点之前,您可以采用与Realtime Sentiment Analysis 项目相同的方法。可以在cds labs spark samples github repo上找到此帮助代码。

于 2016-02-09T15:07:56.437 回答
0

我们在文档中添加了一些关于非 Java 语言支持的文本 - 请参阅“在非 Java 应用程序中连接和验证”部分: https ://www.ng.bluemix.net/docs/services/MessageHub/index .html

我们当前的身份验证方法是非标准的,Apache 项目不支持,但只是一个临时解决方案。Message Hub 团队正在与 Apache Kafka 社区合作开发KIP-43。一旦完成,我们将更改 Message Hub 身份验证实现以匹配,并且可以使用任何语言实现该规范的客户端。

于 2016-02-11T13:01:08.767 回答