我正在尝试编写一个简单的 pyspark 作业,它将从 kafka 代理主题接收数据,对该数据进行一些转换,并将转换后的数据放在不同的 kafka 代理主题上。
我有以下代码,它从 kafka 主题中读取数据,但对运行 sendkafka 函数没有影响:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
def sendkafka(messages):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
for message in messages:
yield producer.send_messages('spark.out', message)
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 5)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
parsed = kvs.map(lambda (key, value): json.loads(value))
parsed.pprint()
sentRDD = kvs.mapPartitions(sendkafka)
sentRDD.count()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
为了让我的 sendkafka 函数真正将数据发送到 spark.out kafka 主题,我应该改变什么?