8

我正在尝试编写一个简单的 pyspark 作业,它将从 kafka 代理主题接收数据,对该数据进行一些转换,并将转换后的数据放在不同的 kafka 代理主题上。

我有以下代码,它从 kafka 主题中读取数据,但对运行 sendkafka 函数没有影响:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient

def sendkafka(messages):
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    for message in messages:
        yield producer.send_messages('spark.out', message)

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 5)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    parsed = kvs.map(lambda (key, value): json.loads(value))
    parsed.pprint()

    sentRDD = kvs.mapPartitions(sendkafka)
    sentRDD.count()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

为了让我的 sendkafka 函数真正将数据发送到 spark.out kafka 主题,我应该改变什么?

4

1 回答 1

14

下面是正确的代码,它从 Kafka 读取到 Spark,并将 Spark 数据写回不同的 kafka 主题:

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def handler(message):
    records = message.collect()
    for record in records:
        producer.send('spark.out', str(record))
        producer.flush()

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

运行它的方法是:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar s.py localhost:9092 test
于 2016-05-21T01:17:26.400 回答