我正在尝试将大型 CSV 发送到 kafka。基本结构是读取 CSV 的一行并将其与标题一起压缩。
a = dict(zip(header, line.split(",")
然后将其转换为带有以下内容的json:
message = json.dumps(a)
然后我使用 kafka-python 库发送消息
from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)
使用 PYSPARK,我很容易从 CSV 文件中创建了消息的 RDD
sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)
messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))
现在我想发送这些消息:我定义了一个函数
def sendkafka(message):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
return producer.send_messages('topic',message)
然后我创建一个新的 RDD 来发送消息
sentRDD = messageRDD.map(lambda x: kafkasend(x))
然后我调用 sendRDD.count()
哪个开始搅动和发送消息
不幸的是,这非常慢。它每秒发送 1000 条消息。这是在一个 10 个节点的集群上,每个集群有 4 个 CPU 和 8GB 内存。
相比之下,在 1000 万行 csv 上创建消息大约需要 7 秒。~ 约 2GB
我认为问题在于我在函数内部实例化了一个 kafka 生产者。但是,如果我不这样做,那么 spark 会抱怨即使我尝试在全球范围内定义生产者也不存在。
也许有人可以阐明如何解决这个问题。
谢谢,