0

我尝试使用pyspark将 spark 和 kafka 集成到 Jupyter notebook中。这是我的工作环境。

Spark 版本:Spark 2.2.1 Kafka 版本:Kafka_2.11-0.8.2.2 Spark 流式 kafka jar:spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar

我在 spark-defaults.conf 文件中添加了一个 Spark 流式 kafka 程序集 jar 文件。

当我启动 pyspark 流的 streamingContext 时,此错误显示为无法从 MANIFEST.MF 读取 kafka 版本。

在此处输入图像描述

这是我的代码。

from pyspark import SparkContext, SparkConf
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
import os

from kafka import KafkaProducer

#Receive data handler
def handler(message):
    records = message.collect()
    for record in records:
        print(record)
        #producer.send('receive', str(res))
        #producer.flush()

producer = KafkaProducer(bootstrap_servers='slave02:9092')
sc = SparkContext(appName="SparkwithKafka")
ssc = StreamingContext(sc, 1)

#Create Kafka streaming with argv
zkQuorum = 'slave02:2181'
topic = 'send'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1})
kvs.foreachRDD(handler)

ssc.start()
4

1 回答 1

2

抱歉我在 Scala 中发帖

Spark 2.2.1 与 Scala 2.11 和 Kafka 0.10 都可以工作,尽管它们被标记为实验性的

如果使用上述库,创建流的正确方法是使用

val kStrream =  KafkaUtils.createDirectStream(
          ssc, PreferConsistent,
          Subscribe[String, String](Array("weblogs-text"), kafkaParams, fromOffsets))

注意依赖关系,例如 kafka 有特定于 Kafka Client 版本和 spark 版本版本的 jar 文件。

       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.1</version>
            <scope>provided</scope>
        </dependency>
于 2018-11-21T05:15:26.887 回答