在我的 pyspark 应用程序中,我打算使用 Spark 流作为一种在“飞行中”转换 Kafka 消息的方法。每条这样的消息最初都是从特定的 Kafka 主题接收的。此类消息将需要进行一些转换(比如说 - 用一个字符串替换另一个字符串),并且转换后的版本需要发布在不同的 Kafka 主题上。第一部分(接收 Kafka 消息)似乎工作正常:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
...
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
将某些东西(比如说 - 一个字符串)放到不同的 Kafka 主题上的正确语法是什么?这种方法应该由 KafkaUtils 提供,还是以其他方式提供?