Jak zaimportować dane z mongodb do pandy?


99

Mam dużą ilość danych w kolekcji w mongodb, które muszę przeanalizować. Jak zaimportować te dane do pand?

Jestem nowy w pandach i odrętwiały.

EDYCJA: Kolekcja mongodb zawiera wartości czujników oznaczone datą i godziną. Wartości czujnika są typu float.

Przykładowe dane:

{
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
    "SensorReport"
],
"Readings" : [
    {
        "a" : 0.958069536790466,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
        "b" : 6.296118156595,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95574014778624,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
        "b" : 6.29651468650064,
        "_cls" : "Reading"
    },
    {
        "a" : 0.953648289182713,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
        "b" : 7.29679823731148,
        "_cls" : "Reading"
    },
    {
        "a" : 0.955931884300997,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
        "b" : 6.29642922525632,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95821381,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
        "b" : 7.28956613,
        "_cls" : "Reading"
    },
    {
        "a" : 4.95821335,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
        "b" : 6.28956574,
        "_cls" : "Reading"
    },
    {
        "a" : 9.95821341,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
        "b" : 0.28956488,
        "_cls" : "Reading"
    },
    {
        "a" : 1.95667927,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
        "b" : 0.29115237,
        "_cls" : "Reading"
    }
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}

Korzystanie z niestandardowego typu pola w MongoEngine może sprawić, że przechowywanie i pobieranie Pandas DataFrames będzie tak proste, jakmongo_doc.data_frame = my_pandas_df
Jthorpe

Odpowiedzi:


134

pymongo może ci pomóc, oto kilka kodów, których używam:

import pandas as pd
from pymongo import MongoClient


def _connect_mongo(host, port, username, password, db):
    """ A util for making a connection to mongo """

    if username and password:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
        conn = MongoClient(mongo_uri)
    else:
        conn = MongoClient(host, port)


    return conn[db]


def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

Dzięki, to jest metoda, której użyłem. Miałem też tablicę dokumentów osadzonych w każdym wierszu. Musiałem więc powtórzyć to również w każdym wierszu. Czy jest lepszy sposób na zrobienie tego?
Nithin

Czy można podać próbki struktury Twojego mongodb?
waitkuo

3
Zwróć uwagę, że list()wnętrze jest df = pd.DataFrame(list(cursor))oceniane jako lista lub generator, aby procesor był chłodny. Jeśli masz milion jeden elementów danych, a kilka następnych wierszy byłoby rozsądnie podzielonych, na poziomie szczegółowości i obciętych, cały shmegegge jest nadal bezpieczny. Świetnie.
Phlip

2
Jest bardzo wolny @ df = pd.DataFrame(list(cursor)). Czyste wyszukiwanie db jest znacznie szybsze. Czy moglibyśmy zmienić listcasting na coś innego?
Peter.k

1
@Peter ta linijka również zwróciła moją uwagę. Rzutowanie kursora bazy danych, który ma być iterowalny i potencjalnie opakowuje duże ilości danych, na listę w pamięci, nie wydaje mi się mądre.
Rafa

41

Możesz załadować dane mongodb do pandy DataFrame używając tego kodu. Mi to pasuje. Mam nadzieję, że dla ciebie też.

import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))

24

Monaryrobi dokładnie to i jest super szybki . ( inny link )

Zobacz ten fajny post, który zawiera krótki samouczek i trochę czasu.


Czy Monary obsługuje łańcuchowy typ danych?
Snehal Parmar

Próbowałem Monary, ale zajmuje to dużo czasu. Czy brakuje mi jakiejś optymalizacji? Tried client = Monary(host, 27017, database="db_tmp") columns = ["col1", "col2"] data_type = ["int64", "int64"] arrays = client.query("db_tmp", "coll", {}, columns, data_type)For 50000records trwa 200s.
nishant

To brzmi bardzo wolno ... Szczerze mówiąc, nie wiem, jaki jest stan tego projektu, teraz, 4 lata później ...
shx2

17

Zgodnie z PEP, proste jest lepsze niż skomplikowane:

import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())

Możesz uwzględnić warunki, tak jak w przypadku zwykłej bazy danych mongoDB, lub nawet użyć funkcji find_one (), aby pobrać tylko jeden element z bazy danych itp.

i voila!


pd.DataFrame.from_records wydaje się działać tak wolno, jak DataFrame (list ()), ale wyniki są bardzo niespójne. %% czas pokazał od 800 ms do 1,9 s
AFD

1
Nie jest to dobre dla dużych rekordów, ponieważ nie pokazuje błędu pamięci, natychmiastowe zawieszanie się systemu dla zbyt dużych danych. podczas gdy pd.DataFrame (lista (kursor)) pokazuje błąd pamięci.
Amulya Acharya

13
import pandas as pd
from odo import odo

data = odo('mongodb://localhost/db::collection', pd.DataFrame)

9

Aby efektywnie radzić sobie z danymi poza rdzeniem (nie mieszczącymi się w pamięci RAM) (tj. Z wykonywaniem równoległym), możesz wypróbować ekosystem Python Blaze : Blaze / Dask / Odo.

Blaze (i Odo ) ma gotowe funkcje do obsługi MongoDB.

Kilka przydatnych artykułów na początek:

Oraz artykuł, który pokazuje, jakie niesamowite rzeczy są możliwe dzięki stosowi Blaze: Analiza 1,7 miliarda komentarzy Reddit z Blaze i Impala (w skrócie, przeszukiwanie 975 GB komentarzy Reddit w kilka sekund).

PS Nie jestem powiązany z żadną z tych technologii.


1
Napisałem również post przy użyciu Jupyter Notebook z przykładem, w jaki sposób Dask pomaga przyspieszyć wykonywanie nawet na danych pasujących do pamięci, używając wielu rdzeni na jednym komputerze.
Dennis Golomazov

8

Inną opcją, którą uznałem za bardzo przydatną, jest:

from pandas.io.json import json_normalize

cursor = my_collection.find()
df = json_normalize(cursor)

w ten sposób możesz bezpłatnie rozwijać zagnieżdżone dokumenty mongodb.


2
Mam błąd przy tej metodzieTypeError: data argument can't be an iterator
Gabriel Fair

2
Dziwne, to działa na moim Pythonie 3.6.7przy użyciu pand 0.24.2. Może możesz spróbować df = json_normalize(list(cursor))zamiast tego?
Ikar Pohorský

Za +1. docs, argument max_level określa maksymalny poziom głębokości dyktowania. Właśnie wykonałem test i to nieprawda, więc niektóre kolumny musiałyby zostać podzielone za pomocą serwerów accesrors .str. Mimo to bardzo fajna funkcja do pracy z mongodb.
Mauricio Maroto

5

Za pomocą

pandas.DataFrame(list(...))

będzie zużywać dużo pamięci, jeśli wynik iteratora / generatora jest duży

lepiej generować małe fragmenty i konkatować na końcu

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)


1

Po tej wspaniałej odpowiedzi, waitkuo , chciałbym dodać możliwość zrobienia tego za pomocą chunksize zgodnie z .read_sql () i .read_csv () . Rozszerzam odpowiedź Deu Leung , unikając przechodzenia po kolei każdego „rekordu” „iteratora” / „kursora”. Pożyczę poprzednią funkcję read_mongo .

def read_mongo(db, 
           collection, query={}, 
           host='localhost', port=27017, 
           username=None, password=None,
           chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """


# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]


# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):

    # Expand the cursor and construct the DataFrame
    #df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
    df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))

    if no_id:
        del df_aux['_id']

    # Concatenate the chunks into a unique df
    if 'df' not in locals():
        df =  df_aux
    else:
        df = pd.concat([df, df_aux], ignore_index=True)

return df

1

Podobne podejście jak Rafael Valero, waitkuo i Deu Leung przy użyciu paginacji :

def read_mongo(
       # db, 
       collection, query=None, 
       # host='localhost', port=27017, username=None, password=None,
       chunksize = 100, page_num=1, no_id=True):

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Calculate number of documents to skip
    skips = chunksize * (page_num - 1)

    # Sorry, this is in spanish
    # https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
    if not query:
        query = {}

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query).skip(skips).limit(chunksize)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

0

Możesz osiągnąć to, co chcesz z pdmongo w trzech liniach:

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [], "mongodb://localhost:27017/mydb")

Jeśli Twoje dane są bardzo duże, możesz najpierw wykonać zapytanie zagregowane, filtrując niepotrzebne dane, a następnie mapując je na żądane kolumny.

Oto przykład mapowania Readings.ado kolumny ai filtrowania według reportCountkolumny:

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [{'$match': {'reportCount': {'$gt': 6}}}, {'$unwind': '$Readings'}, {'$project': {'a': '$Readings.a'}}], "mongodb://localhost:27017/mydb")

read_mongoakceptuje te same argumenty, co agregat pymongo

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.