我正在使用以下代码从 kafka 获取消息
斯卡拉代码:
val lines: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
zookeeperQuorum, consumerGroup, topicMap)
lines.print(10)
这是我的示例生产者代码。
from kafka import SimpleProducer, KafkaClient
import time
# To send messages synchronously
kafka = KafkaClient(serverip+':'+port)
producer = SimpleProducer(kafka)
kafka.ensure_topic_exists('test')
kafka.ensure_topic_exists('test1')
while(1):
print "sending message "
producer.send_messages(b'test', 'test,msg')
time.sleep(2)
producer.send_messages(b'test1', 'test1,msg')
time.sleep(2)
我的流媒体接收器打印
(null,'test,msg')
(null,'test1,msg')
问题:
1) How can I differentiate msg per topic level without actually
decoding the message ?
2) Why it is giving me null in the output ? From the documentation
it says key,value tuple. How can I create key,value tuple kind of
message ?
编辑:使用 keyedProducer
kafka = KafkaClient(serverip+':'+port)
producer = KeyedProducer(kafka)
kafka.ensure_topic_exists('test2')
while(1):
print "sending msg "
producer.send_messages(b'test2',b'key1','msg')
time.sleep(2)
这让我犯了错误
raise PartitionUnavailableError("%s not available" % str(key))
kafka.common.PartitionUnavailableError: TopicAndPartition(topic='test2', partition='key1') not available