0

我有以下 pyspark 脚本,假设连接到本地 kafka 集群:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

当我运行它时,我收到以下错误:

File "/home/ubuntu/spark-1.3.0-bin-hadoop2.4/hello1.py", line 16, in main
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
AttributeError: type object 'KafkaUtils' has no attribute 'createDirectStream'

我应该怎么做才能访问 KafkaUtils.createDirectStream ?

4

1 回答 1

1

您使用的是 Spark 1.3.0,并且 Python 版本createDirectStream已在 Spark 1.4.0 中引入。Spark 1.3 仅提供 Scala 和 Java 实现。

如果你想使用直接流,你必须更新你的 Spark 安装。

于 2016-05-19T15:24:06.533 回答