0

我正在使用带有 pyspark 的 Snappydata 来运行我的 sql 查询并将输出 DF 转换为字典以将其批量插入到 mongo 中。我已经经历了许多类似的问题来测试 spark DF 到 Dictionary 的转换。

目前我正在使用map(lambda row: row.asDict(), x.collect())这种方法将我的批量DF 转换为字典。10K 条记录需要 2-3 秒。

我在下面说明了我是如何实现我的想法的:

x = snappySession.sql("select * from test")
df = map(lambda row: row.asDict(), x.collect())
db.collection.insert_many(df)

有没有更快的方法?

4

2 回答 2

0

我会研究您是否可以直接从 Spark 写入 Mongo,因为这将是最好的方法。

如果做不到这一点,您可以使用此方法:

x = snappySession.sql("select * from test")
dictionary_rdd = x.rdd.map(lambda row: row.asDict())

for d in dictionary_rdd.toLocalIterator():
    db.collection.insert_many(d)

这将以分布式方式在 Spark 中创建所有字典。这些行将返回给驱动程序并一次插入到 Mongo 中,这样您就不会耗尽内存。

于 2017-12-07T11:04:16.123 回答
0

我建议使用foreachPartition

(snappySession
    .sql("select * from test")
    .foreachPartition(insert_to_mongo))

其中insert_to_mongo

def insert_to_mongo(rows):
    client  = ...
    db = ...
    db.collection.insert_many((row.asDict() for row in rows))
于 2017-12-07T15:08:08.923 回答