我的目标是从非文件源(即在程序中生成或通过 API 发送)获取数据并将其发送到 spark 流。为此,我通过基于python 的方式 KafkaProducer
发送数据:
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
$ bin/kafka-server-start.sh config/server.properties &
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
$ python
Python 3.6.1| Anaconda custom (64-bit)
> from kafka import KafkaProducer
> import time
> producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
> producer.send(topic = 'my-topic', value = 'MESSAGE ACKNOWLEDGED', timestamp_ms = time.time())
> producer.close()
> exit()
我的问题是从消费者 shell 脚本检查主题时没有出现任何内容:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic my-topic
^C$
这里有什么遗漏或错误吗?我是 spark/kafka/messaging 系统的新手,所以任何事情都会有所帮助。Kafka 版本是 0.11.0.0 (Scala 2.11),并且没有对配置文件进行任何更改。