Jak nasłuchiwać zmian w kolekcji MongoDB?


200

Tworzę rodzaj systemu kolejek zadań w tle z MongoDB jako magazynem danych. Jak „nasłuchiwać” wstawek do kolekcji MongoDB przed spawnowaniem pracowników w celu przetworzenia zadania? Czy muszę sondować co kilka sekund, aby zobaczyć, czy są jakieś zmiany od ostatniego razu, czy też jest jakiś sposób, w jaki mój skrypt może czekać na wstawienie? To projekt PHP, nad którym pracuję, ale nie wahaj się odpowiedzieć w Ruby lub agnostyce językowej.


1
Zmianę strumieni dodano w MongoDB 3.6, aby rozwiązać Twój scenariusz. docs.mongodb.com/manual/changeStreams Także jeśli korzystasz z MongoDB Atlas, możesz użyć wyzwalaczy ściegów, które umożliwiają wykonywanie funkcji w odpowiedzi na wstawianie / aktualizację / usuwanie / itp. docs.mongodb.com/stitch/triggers/overview Nie trzeba już analizować oplogu.
Robert Walters

Odpowiedzi:


111

To, o czym myślisz, przypomina dźwięki wyzwalające. MongoDB nie obsługuje żadnych wyzwalaczy, jednak niektóre osoby „rzuciły własne” za pomocą niektórych sztuczek. Kluczem tutaj jest oplog.

Po uruchomieniu MongoDB w zestawie repliki wszystkie akcje MongoDB są rejestrowane w dzienniku operacji (znanym jako oplog). Oplog to w zasadzie tylko bieżąca lista modyfikacji wprowadzonych do danych. Repliki Ustawia funkcję, nasłuchując zmian w tym oplogu, a następnie stosując zmiany lokalnie.

Czy to brzmi znajomo?

Nie mogę tutaj szczegółowo opisać całego procesu, jest to kilka stron dokumentacji, ale potrzebne narzędzia są dostępne.

Najpierw kilka wpisów na oplogu - Krótki opis - Układ localkolekcji (która zawiera oplog)

Będziesz także chciał wykorzystać kursory, które można dostosowywać . Umożliwi to słuchanie zmian zamiast odpytywania ich. Pamiętaj, że replikacja korzysta z kursorów dostosowanych do potrzeb, więc jest to obsługiwana funkcja.


1
hmm ... nie do końca to, co miałem na myśli. W tym momencie prowadzę tylko jedną instancję (bez niewolników). Więc może bardziej podstawowe rozwiązanie?
Andrew

17
Możesz uruchomić serwer z --replSetopcją, która utworzy / wypełni oplog. Nawet bez wtórnego. Jest to zdecydowanie jedyny sposób na „słuchanie” zmian w bazie danych.
Gates VP

2
To jest ładny opis, jak skonfigurować oplog do lokalnego rejestrowania zmian w DB: loosexaml.wordpress.com/2012/09/03/…
johndodo

Coaye! Właśnie tego chcę. I znalazłem bibliotekę o nazwie „mongo-oplog” na npm. Tak szczęśliwy ~
pjincz

Zgadzam się do momentu napisania tej odpowiedzi wyzwalacze mogą nie być dostępne, ale dla wszystkich, którzy tu wylądują, dostępna jest teraz opcja, sprawdź MongoDB Stitch ( docs.mongodb.com/stitch/#stitch ) i wyzwalacze Stitch ( docs. mongodb.com/stitch/triggers ) ..
whoami

102

MongoDB ma co nazywa capped collectionsi tailable cursorsże pozwala MongoDB danych Prześlij na słuchaczy.

A capped collectionto w zasadzie kolekcja o ustalonym rozmiarze i umożliwiająca tylko wstawianie. Oto jak wyglądałoby utworzenie takiego:

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB Tailable cursors ( oryginalny post Jonathan H. Wage )

Rubin

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python (autor: Robert Stewart)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl (przez Max )

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Dodatkowe zasoby:

Samouczek Ruby / Node.js, który przeprowadzi cię przez proces tworzenia aplikacji, która nasłuchuje wstawień w kolekcji z ograniczeniem MongoDB.

Artykuł mówi bardziej szczegółowo o kursorach dostosowanych do potrzeb.

Przykłady użycia kursorów dostosowywanych przez PHP, Ruby, Python i Perl.


70
spać 1? naprawdę? dla kodu produkcyjnego? jak to nie sonduje?
rbp

2
@rbp haha, nigdy nie mówiłem, że to kod produkcyjny, ale masz rację, spanie przez sekundę nie jest dobrą praktyką. Jestem całkiem pewien, że mam ten przykład skądinąd. Nie jestem jednak pewien, jak to zmienić.
Andrew

14
@kroe, ponieważ te nieistotne szczegóły zostaną wprowadzone do kodu produkcyjnego przez nowszych programistów, którzy mogą nie rozumieć, dlaczego jest zły.
Sum

3
Rozumiem twój punkt widzenia, ale oczekiwanie, że niektórzy nowi programiści dodadzą „sleep 1” do produkcji, jest niemal obraźliwe! Nie zdziwiłbym się ... Ale jeśli ktoś wprowadzi to do produkcji, przynajmniej nauczy się na
własnej skórze

19
co jest nie tak z robieniem time.sleep (1) w produkcji?
Al Johri 30.04.16

44

Od wersji MongoDB 3.6 pojawi się nowy interfejs API powiadomień o nazwie Zmień strumienie, którego można użyć do tego celu. Zobacz ten post na blogu jako przykład . Przykład z tego:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])

4
Czemu? Czy możesz rozwinąć? To jest teraz standardowy sposób?
Mitar

1
w jaki sposób? nie używaj odpytywania - potrzebujesz podejścia eventowego zamiast pętli while itp.
Alexander Mills

3
Gdzie widzisz tutaj ankietę?
Mitar

Myślę, że on / ona odnosi się do ostatniej pętli. Ale myślę, że PyMongo obsługuje tylko to. Silnik może mieć implementację w stylu detektora asynchronizacji / zdarzenia.
Shane Hsu

41

Sprawdź to: Zmień strumienie

10 stycznia 2018 r. - wersja 3.6

* EDYCJA: Napisałem artykuł o tym, jak to zrobić https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


Jest nowy w Mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

Aby użyć changeStreams, baza danych musi być zestawem replikacji

Więcej informacji o zestawach replikacji: https://docs.mongodb.com/manual/replication/

Twoja baza danych będzie domyślnie „ samodzielna ”.

Jak przekonwertować autonomiczny zestaw replik: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


Poniższy przykład jest praktyczną aplikacją, w jaki sposób możesz tego użyć.
* Specjalnie dla węzła.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

Przydatne linki:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams


przepraszam za wszystkie zmiany, SO nie podobały mi się moje „Linki” (powiedziałem, że były one nieprawidłowo sformatowanym kodem).
Rio Weber

1
nie powinieneś przesyłać zapytań do bazy danych, myślę, że za pomocą funkcji watch () lub podobnej nowe dane można wysłać na serwer, który nasłuchuje
Alexander Mills,

22

Wersja 3.6 MongoDB zawiera teraz strumienie zmian, które są zasadniczo interfejsem API nad OpLog, umożliwiając przypadki użycia podobne do wyzwalaczy / powiadomień.

Oto link do przykładu Java: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

Przykład NodeJS może wyglądać mniej więcej tak:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });

JSON.stringify jest bardzo ważny, aby otrzymywać te dane w Android Studio (aplikacja na Androida) ..
DragonFire

3

Alternatywnie można użyć standardowej metody Mongo FindAndUpdate, aw ramach wywołania zwrotnego wywołać zdarzenie EventEmitter (w węźle), gdy wywołanie zwrotne jest uruchomione.

Wszelkie inne części aplikacji lub architektury nasłuchujące tego wydarzenia zostaną powiadomione o aktualizacji, a także wszelkie odpowiednie dane tam przesłane. To naprawdę prosty sposób na otrzymywanie powiadomień z Mongo.


to jest bardzo nieefektywne. Blokujesz db dla każdej FindAndUpdate!
Yash Gupta

1
Domyślam się, że Alex odpowiadał na nieco inne (nie konkretnie adresujące wstawki), ale powiązane pytanie, w jaki sposób wystrzelić jakieś powiadomienie dla klientów, gdy stan kolejki zmieni się, co, jak zakładamy, będzie musiało nastąpić, gdy zadania są odradzane , dokończenie lub niepowodzenie. Gdy klienci są podłączeni za pomocą gniazd sieciowych do węzła, wszyscy mogą być powiadamiani o zmianach za pomocą zdarzenia rozgłoszeniowego na wywołaniu zwrotnym FIndAndUpdate, które można wywołać po odebraniu wiadomości o zmianie stanu. Powiedziałbym, że nie jest to nieefektywne, ponieważ należy dokonać aktualizacji.
Peter Scott

3

Wiele z tych odpowiedzi da tylko nowe rekordy, a nie aktualizacje i / lub są wyjątkowo nieefektywne

Jedynym niezawodnym i wydajnym sposobem na to jest utworzenie dostosowanego kursora w lokalnej kolekcji db: oplog.rs, aby uzyskać WSZYSTKIE zmiany w MongoDB i zrobić to, co chcesz. (MongoDB robi to wewnętrznie mniej więcej w celu obsługi replikacji!)

Wyjaśnienie, co zawiera oplog: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

Przykład biblioteki Node.js, która udostępnia API wokół tego, co można zrobić z oplogiem: https://github.com/cayasso/mongo-oplog



1

Istnieje działający przykład Java, który można znaleźć tutaj .

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

Kluczem są OPCJE QUERY podane tutaj .

Możesz także zmienić zapytanie, jeśli nie musisz ładować wszystkich danych za każdym razem.

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

1

W rzeczywistości, zamiast oglądać dane wyjściowe, dlaczego nie otrzymujesz powiadomienia, gdy wstawiane jest coś nowego za pomocą oprogramowania pośredniego dostarczonego przez schemat mangusty

Możesz złapać zdarzenie wstawienia nowego dokumentu i zrobić coś po tym wstawieniu


Mój błąd. Przepraszam pana.
Duong Nguyen,

0

Po wersji 3.6 można korzystać z bazy danych następujące typy wyzwalaczy:

  • wyzwalacze sterowane zdarzeniami - przydatne do automatycznej aktualizacji powiązanych dokumentów, powiadamiania usług niższego szczebla, propagowania danych w celu obsługi mieszanych obciążeń, integralności danych i audytu
  • zaplanowane wyzwalacze - przydatne w przypadku zaplanowanych operacji pobierania, propagacji, archiwizacji i analiz danych

Zaloguj się do swojego konta Atlas, wybierz Triggersinterfejs i dodaj nowy wyzwalacz:

wprowadź opis zdjęcia tutaj

Rozwiń każdą sekcję, aby uzyskać więcej ustawień lub szczegółów.

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.