0

我正在使用以下代码从 kafka 获取消息

斯卡拉代码:

val lines: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
 zookeeperQuorum, consumerGroup, topicMap)
lines.print(10)

这是我的示例生产者代码。

    from kafka import SimpleProducer, KafkaClient
    import time
    # To send messages synchronously
    kafka = KafkaClient(serverip+':'+port)
    producer = SimpleProducer(kafka)
    kafka.ensure_topic_exists('test')

    kafka.ensure_topic_exists('test1')

    while(1):
     print "sending message "
     producer.send_messages(b'test', 'test,msg')
     time.sleep(2)
     producer.send_messages(b'test1', 'test1,msg')
     time.sleep(2)

我的流媒体接收器打印

(null,'test,msg')
(null,'test1,msg')

问题:

1) How can I differentiate msg per topic level without actually
decoding the message ?

2) Why it is giving me null in the output ? From the documentation
it says key,value tuple. How can I create key,value tuple kind of
message ?

编辑:使用 keyedProducer

kafka = KafkaClient(serverip+':'+port)
producer = KeyedProducer(kafka)

kafka.ensure_topic_exists('test2')

while(1):
   print "sending msg "
   producer.send_messages(b'test2',b'key1','msg')
   time.sleep(2)

这让我犯了错误

raise PartitionUnavailableError("%s not available" % str(key))                                                                                                                            
kafka.common.PartitionUnavailableError: TopicAndPartition(topic='test2', partition='key1') not available   
4

2 回答 2

1

对于#1,最简单的方法是为每个主题设置单独的流,如果在任何时候您需要将它们组合在一起并且它们具有相同的结构 - 您可以将它们合并

对于#2,您是否尝试过使用KeyedProducer

来自上面链接的片段:

producer = KeyedProducer(kafka)
producer.send_messages(b'my-topic', b'key1', b'some message')
producer.send_messages(b'my-topic', b'key2', b'this methode')
于 2016-01-11T16:43:11.600 回答
0

对于问题没有。1 你可以使用这个签名

def
createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]
(ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long],
 messageHandler: (MessageAndMetadata[K, V]) ⇒ R): InputDStream[R]

这将使您可以访问 MessageAndMetadata 类,该类包含主题名称以及一些其他元数据,例如分区号和消息偏移量。例如

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Map[String, String]](
  ssc,
  Map("metadata.broker.list" -> "localhost:9092"),
  topics,
  (mm: MessageAndMetadata[String, String]) => Map(mm.topic -> mm.message))

然后你可以在地图键上进行模式匹配来做任何你想做的事情

于 2016-07-09T01:28:32.920 回答