1

在我的 pyspark 应用程序中,我打算使用 Spark 流作为一种在“飞行中”转换 Kafka 消息的方法。每条这样的消息最初都是从特定的 Kafka 主题接收的。此类消息将需要进行一些转换(比如说 - 用一个字符串替换另一个字符串),并且转换后的版本需要发布在不同的 Kafka 主题上。第一部分(接收 Kafka 消息)似乎工作正常:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    ...

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

将某些东西(比如说 - 一个字符串)放到不同的 Kafka 主题上的正确语法是什么?这种方法应该由 KafkaUtils 提供,还是以其他方式提供?

4

2 回答 2

1

根据 SPARK 文档 https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd的正确方法

def kafka_sender(messages):
    producer = KafkaProducer(bootstrap_servers='localhost:9092')

    for message in messages:
        producer.send('alerts', bytes(message[0].encode('utf-8')))
        # For faster push
        # producer.flush()  

    producer.flush()



# On your Dstream
sentiment_data.foreachRDD(lambda rdd: rdd.foreachPartition(kafka_sender))
于 2018-05-08T16:36:21.767 回答
1

在处理函数中,我们可以对每条记录执行任何操作,然后将该记录发送到不同的 kafka 主题:

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def handler(message):
    records = message.collect()
    for record in records:
        producer.send('spark.out', str(record))
        producer.flush()

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

要运行这个:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar s.py localhost:9092 test
于 2016-05-21T01:30:48.747 回答