我尝试使用pyspark将 spark 和 kafka 集成到 Jupyter notebook中。这是我的工作环境。
Spark 版本:Spark 2.2.1 Kafka 版本:Kafka_2.11-0.8.2.2 Spark 流式 kafka jar:spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar
我在 spark-defaults.conf 文件中添加了一个 Spark 流式 kafka 程序集 jar 文件。
当我启动 pyspark 流的 streamingContext 时,此错误显示为无法从 MANIFEST.MF 读取 kafka 版本。
这是我的代码。
from pyspark import SparkContext, SparkConf
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
import os
from kafka import KafkaProducer
#Receive data handler
def handler(message):
records = message.collect()
for record in records:
print(record)
#producer.send('receive', str(res))
#producer.flush()
producer = KafkaProducer(bootstrap_servers='slave02:9092')
sc = SparkContext(appName="SparkwithKafka")
ssc = StreamingContext(sc, 1)
#Create Kafka streaming with argv
zkQuorum = 'slave02:2181'
topic = 'send'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1})
kvs.foreachRDD(handler)
ssc.start()