108

我在 mongodb 的集合中有大量数据需要分析。我如何将该数据导入熊猫?

我是熊猫和 numpy 的新手。

编辑: mongodb 集合包含带有日期和时间标记的传感器值。传感器值是浮点数据类型。

样本数据:

{
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
    "SensorReport"
],
"Readings" : [
    {
        "a" : 0.958069536790466,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
        "b" : 6.296118156595,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95574014778624,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
        "b" : 6.29651468650064,
        "_cls" : "Reading"
    },
    {
        "a" : 0.953648289182713,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
        "b" : 7.29679823731148,
        "_cls" : "Reading"
    },
    {
        "a" : 0.955931884300997,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
        "b" : 6.29642922525632,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95821381,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
        "b" : 7.28956613,
        "_cls" : "Reading"
    },
    {
        "a" : 4.95821335,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
        "b" : 6.28956574,
        "_cls" : "Reading"
    },
    {
        "a" : 9.95821341,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
        "b" : 0.28956488,
        "_cls" : "Reading"
    },
    {
        "a" : 1.95667927,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
        "b" : 0.29115237,
        "_cls" : "Reading"
    }
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}
4

14 回答 14

156

pymongo可能会给你帮助,以下是我正在使用的一些代码:

import pandas as pd
from pymongo import MongoClient


def _connect_mongo(host, port, username, password, db):
    """ A util for making a connection to mongo """

    if username and password:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
        conn = MongoClient(mongo_uri)
    else:
        conn = MongoClient(host, port)


    return conn[db]


def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df
于 2013-04-27T18:45:56.430 回答
49

您可以使用此代码将您的 mongodb 数据加载到 pandas DataFrame。这个对我有用。希望对你也一样。

import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))
于 2014-12-23T09:15:23.870 回答
24

Monary正是这样做的,而且速度非常快。(另一个链接

请参阅这篇很酷的帖子,其中包括一个快速教程和一些时间安排。

于 2013-12-19T22:33:39.947 回答
23

根据 PEP,简单胜于复杂:

import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())

您可以像使用常规 mongoDB 数据库一样包含条件,甚至可以使用 find_one() 从数据库中仅获取一个元素等。

瞧!

于 2016-10-23T11:43:19.377 回答
15
import pandas as pd
from odo import odo

data = odo('mongodb://localhost/db::collection', pd.DataFrame)
于 2016-10-20T23:33:06.273 回答
14

我发现非常有用的另一个选项是:

from pandas.io.json import json_normalize

cursor = my_collection.find()
df = json_normalize(cursor)

(或者json_normalize(list(cursor)),取决于您的 python/pandas 版本)。

这样您就可以免费展开嵌套的 mongodb 文档。

于 2018-03-29T08:57:17.337 回答
9

为了有效地处理核外(不适合 RAM)数据(即并行执行),您可以尝试Python Blaze 生态系统:Blaze / Dask / Odo。

Blaze(和Odo)具有处理 MongoDB 的开箱即用功能。

一些有用的文章开始:

还有一篇文章展示了使用 Blaze 堆栈可以实现哪些惊人的事情:Analyzing 17 亿 Reddit Comments with Blaze 和 Impala(基本上,在几秒钟内查询 975 Gb 的 Reddit 评论)。

PS我不隶属于任何这些技术。

于 2016-09-27T00:16:38.680 回答
5

使用

pandas.DataFrame(list(...))

如果迭代器/生成器结果很大,将消耗大量内存

最好在最后生成小块并连接

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)
于 2016-09-12T08:19:15.293 回答
3

http://docs.mongodb.org/manual/reference/mongoexport

导出到 csv 并使用read_csv 或 JSON 并使用DataFrame.from_records()

于 2013-04-27T11:32:35.263 回答
1

在waitkuo 给出这个很好的答案之后,我想添加使用符合.read_sql().read_csv()的 chunksize 的可能性。我通过避免对“迭代器”/“光标”的每个“记录”一一进行来扩大Deu Leung的答案。我将借用以前的 read_mongo函数。

def read_mongo(db, 
           collection, query={}, 
           host='localhost', port=27017, 
           username=None, password=None,
           chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """


# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]


# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):

    # Expand the cursor and construct the DataFrame
    #df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
    df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))

    if no_id:
        del df_aux['_id']

    # Concatenate the chunks into a unique df
    if 'df' not in locals():
        df =  df_aux
    else:
        df = pd.concat([df, df_aux], ignore_index=True)

return df
于 2018-03-06T10:43:53.037 回答
1

类似的方法,如 Rafael Valero、waitingkuo 和 Deu Leung 使用分页

def read_mongo(
       # db, 
       collection, query=None, 
       # host='localhost', port=27017, username=None, password=None,
       chunksize = 100, page_num=1, no_id=True):

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Calculate number of documents to skip
    skips = chunksize * (page_num - 1)

    # Sorry, this is in spanish
    # https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
    if not query:
        query = {}

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query).skip(skips).limit(chunksize)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df
于 2018-03-20T01:19:26.273 回答
1

您可以使用pdmongo在三行中实现您想要的:

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [], "mongodb://localhost:27017/mydb")

如果您的数据非常大,您可以先通过过滤不需要的数据进行聚合查询,然后将它们映射到您想要的列。

这是映射Readings.a到列a并按列过滤的示例reportCount

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [{'$match': {'reportCount': {'$gt': 6}}}, {'$unwind': '$Readings'}, {'$project': {'a': '$Readings.a'}}], "mongodb://localhost:27017/mydb")

read_mongo接受与pymongo 聚合相同的参数

于 2020-08-05T20:19:10.750 回答
1

你也可以使用pymongoarrow——它是 MongoDB 提供的一个官方库,用于将 mongodb 数据导出到 pandas、numPy、parquet 文件等。

于 2021-05-11T16:57:03.703 回答
0

您可以使用“pandas.json_normalize”方法

import pandas as pd
display(pd.json_normalize( x ))
display(pd.json_normalize( x , record_path="Readings" ))

它应该显示两个表格,其中 x 是您的光标或:

from bson import ObjectId
def ISODate(st):
    return st

x = {
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
    "SensorReport"
],
"Readings" : [
    {
        "a" : 0.958069536790466,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
        "b" : 6.296118156595,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95574014778624,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
        "b" : 6.29651468650064,
        "_cls" : "Reading"
    },
    {
        "a" : 0.953648289182713,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
        "b" : 7.29679823731148,
        "_cls" : "Reading"
    },
    {
        "a" : 0.955931884300997,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
        "b" : 6.29642922525632,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95821381,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
        "b" : 7.28956613,
        "_cls" : "Reading"
    },
    {
        "a" : 4.95821335,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
        "b" : 6.28956574,
        "_cls" : "Reading"
    },
    {
        "a" : 9.95821341,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
        "b" : 0.28956488,
        "_cls" : "Reading"
    },
    {
        "a" : 1.95667927,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
        "b" : 0.29115237,
        "_cls" : "Reading"
    }
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}
于 2021-11-07T03:02:47.660 回答