RabbitMQ / AMQP: pojedyncza kolejka, wielu odbiorców dla tej samej wiadomości?


146

Właśnie zaczynam ogólnie używać RabbitMQ i AMQP.

  • Mam kolejkę wiadomości
  • Mam wielu konsumentów, którym chciałbym robić różne rzeczy z tym samym przesłaniem .

Wydaje się, że większość dokumentacji RabbitMQ koncentruje się na zasadzie działania okrężnego, tj. Gdy pojedyncza wiadomość jest konsumowana przez jednego konsumenta, a obciążenie rozkłada się na każdego konsumenta. To jest rzeczywiście zachowanie, którego jestem świadkiem.

Przykład: producent ma jedną kolejkę i wysyła komunikaty co 2 sekundy:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

A oto konsument:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

Jeśli dwukrotnie uruchomię konsumenta, widzę, że każdy konsument konsumuje alternatywne komunikaty w trybie okrężnym. Np. Zobaczę wiadomości 1, 3, 5 w jednym terminalu, 2, 4, 6 w drugim .

Moje pytanie brzmi:

  • Czy każdy konsument może otrzymać te same wiadomości? To znaczy, obaj konsumenci otrzymują wiadomość 1, 2, 3, 4, 5, 6? Jak to się nazywa w języku AMQP / RabbitMQ? Jak to jest normalnie skonfigurowane?

  • Czy to się często robi? Czy zamiast tego powinienem po prostu skierować wiadomość do dwóch oddzielnych kolejek z jednym odbiorcą?


5
Nie jestem ekspertem RabbitMQ. Jednak to, co teraz masz, nazywa się kolejką, ale to, czego chcesz, to tematy, zobacz ten samouczek: rabbitmq.com/tutorials/tutorial-five-python.html , więcej o kolejkach i tematach: msdn.microsoft.com/en-us /library/windowsazure/hh367516.aspx
UrbanEsc

1
Wierzę, że faktycznie chce fanoutu, chociaż tematy też będą działać i da większą kontrolę później.
robthewolf

Dzięki @UrbanEsc. Wydaje się, że tematy rozwiązują problem, ponieważ jedna wiadomość trafia do wielu kolejek, a zatem jest używana przez konsumentów w każdej kolejce. Co prowadzi mnie dalej w kierunku scenariusza wielu kolejek / pojedynczego konsumenta dla mojego konkretnego przypadku.
mikemaccana

1
Dla 2018 (a nawet 2016 i wcześniejszych) odpowiedzią jest użycie czegoś takiego jak Kafka, IMO.
WattsInABox

Odpowiedzi:


115

Czy każdy konsument może otrzymać te same wiadomości? To znaczy, obaj konsumenci otrzymują wiadomość 1, 2, 3, 4, 5, 6? Jak to się nazywa w języku AMQP / RabbitMQ? Jak to jest normalnie skonfigurowane?

Nie, jeśli konsumenci są w tej samej kolejce. Z przewodnika RabbitMQ po AMQP Concepts :

ważne jest, aby zrozumieć, że w AMQP 0-9-1 komunikaty są równoważone obciążeniem między konsumentami.

Wydaje się to sugerować, że działanie okrężne w kolejce jest dane i nie można go konfigurować. Oznacza to, że oddzielne kolejki są wymagane, aby ten sam identyfikator wiadomości był obsługiwany przez wielu konsumentów.

Czy to się często robi? Czy zamiast tego powinienem po prostu skierować wiadomość do dwóch oddzielnych kolejek z jednym odbiorcą?

Nie, nie jest, pojedyncza kolejka / wielu konsumentów, z których każdy obsługuje ten sam identyfikator wiadomości, nie jest możliwy. Fakt, że wymiana kieruje wiadomość do dwóch oddzielnych kolejek, jest rzeczywiście lepsza.

Ponieważ nie potrzebuję zbyt skomplikowanego routingu, wymiana fanoutów ładnie sobie z tym poradzi. Nie skupiałem się zbytnio na wymianach wcześniej, ponieważ node-amqp ma koncepcję `` domyślnej wymiany '', która pozwala na publikowanie wiadomości bezpośrednio w połączeniu, jednak większość wiadomości AMQP jest publikowana na określonej giełdzie.

Oto moja wymiana fanoutów, zarówno wysyłająca, jak i odbierająca:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})

1
Rozwinięcie było wyraźnie tym, czego chciałeś. To by ci nie pomogło, ale pomyślałem, że wspomnę, że zachowanie okrężne w kolejce jest konfigurowalne. int prefetchCount = 1; channel.basicQos(prefetchCount); Umożliwi to każdemu konsumentowi otrzymanie wiadomości, gdy tylko skończy się ona z poprzednią. Zamiast otrzymywać naprzemienne wiadomości. Ponownie nie rozwiązuje problemu, ale może być przydatny dla innych osób. przykład tutaj http://www.rabbitmq.com/tutorials/tutorial-two-java.html w ramach Fair Dispatch
Ommit

3
Wyjaśnienie: „domyślna wymiana” nie jest specyficzna dla węzła-amqp. Jest to ogólna koncepcja AMQP z następującymi regułami: gdy dowolny komunikat opublikowany w domyślnej wymianie, klucz routingu (z którym ten komunikat został opublikowany) jest traktowany jako nazwa kolejki przez brokera AMQP. Wygląda więc na to, że możesz publikować bezpośrednio w kolejkach. Ale nie jesteś. Broker po prostu wiąże każdą kolejkę z domyślną wymianą kluczem routingu równym nazwie kolejki.
Ruslan Stelmachenko

2
Czy jest jakaś alternatywa dla tematów Apache activemq jms w rabbitmq, gdzie nie ma kolejek, a raczej multiemisja?
pantonis

Jeśli ten sam użytkownik loguje się z wielu urządzeń, wiadomość otrzyma tylko jedno urządzenie. Jak można rozwiązać ten problem lub proszę o jakiś pomysł?
Rafiq

@Rafiq powinieneś zadać pytanie w tej sprawie.
mikemaccana

28

Przeczytaj samouczek rabbitmq . Publikujesz wiadomość do wymiany, a nie do kolejki; jest następnie kierowany do odpowiednich kolejek. W twoim przypadku powinieneś ustawić oddzielną kolejkę dla każdego konsumenta. W ten sposób mogą odbierać wiadomości całkowicie niezależnie.


27

Ostatnie kilka odpowiedzi jest prawie poprawnych - mam mnóstwo aplikacji, które generują wiadomości, które muszą trafić do różnych konsumentów, więc proces jest bardzo prosty.

Jeśli chcesz, aby wielu konsumentów otrzymywało tę samą wiadomość, wykonaj następującą procedurę.

Utwórz wiele kolejek, po jednej dla każdej aplikacji, która ma otrzymać wiadomość, we właściwościach każdej kolejki, „powiąż” znacznik routingu z wymianą amq.direct. Zmień aplikację do publikowania na wysyłanie do amq.direct i użyj tagu routingu (nie kolejki). AMQP skopiuje następnie wiadomość do każdej kolejki z tym samym powiązaniem. Działa jak marzenie :)

Przykład: Powiedzmy, że mam ciąg JSON, który generuję, publikuję go na giełdzie „amq.direct” za pomocą tagu routingu „nowe-zamówienie-sprzedaży”, mam kolejkę do mojej aplikacji order_printer, która drukuje zamówienie, mam kolejka do mojego systemu rozliczeniowego, który wyśle ​​kopię zamówienia i wystawi fakturę klientowi oraz mam system archiwum internetowego, w którym archiwizuję zamówienia ze względów historycznych / zgodności oraz mam interfejs sieciowy klienta, w którym zamówienia są śledzone w miarę pojawiania się innych informacji porządek.

Więc moje kolejki to: order_printer, order_billing, order_archive i order_tracking. Wszystkie mają przypisany do siebie tag „nowe-zamówienie-sprzedaży”, wszystkie 4 otrzymają dane JSON.

Jest to idealny sposób na wysyłanie danych bez wiedzy aplikacji do publikowania lub dbania o nie.


8

Tak, każdy konsument może otrzymywać te same wiadomości. zajrzyj na http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http: //www.rabbitmq. pl / tutorials / tutorial-five-python.html

na różne sposoby kierowania wiadomości. Wiem, że są przeznaczone dla języków Python i Java, ale dobrze jest zrozumieć zasady, zdecydować, co robisz, a następnie dowiedzieć się, jak to zrobić w JS. Wygląda na to, że chcesz zrobić prosty fanout ( tutorial 3 ), który wysyła wiadomości do wszystkich kolejek podłączonych do giełdy.

Różnica w tym, co robisz i co chcesz robić, polega w zasadzie na tym, że zamierzasz skonfigurować i wymienić lub wpisać fanout. Excahnges fanoutów wysyłają wszystkie wiadomości do wszystkich podłączonych kolejek. Każda kolejka będzie miała konsumenta, który będzie miał dostęp do wszystkich komunikatów oddzielnie.

Tak, jest to powszechnie stosowane, jest to jedna z funkcji AMPQ.


świetna odpowiedź, z wyjątkiem tego, że „czy to się często robi?” Mówiłem o tym, że „każdy konsument otrzymuje te same wiadomości” - co nie jest powszechne (konsumenci w tej samej kolejce zawsze działają w trybie okrężnym). Prawdopodobnie moja wina, że ​​nie byłem wystarczająco jasny.
mikemaccana

Właściwie zaryzykowałbym stwierdzenie, że zależy to od tego, do czego chcesz go użyć. Masz do wyboru dwie podstawowe kolejki pub / sub lub work. Twoja pierwotna konfiguracja była kolejką do pracy, ale chciałeś mieć fanoutowy pub / sub. Chodzi o to, że powszechne użycie tutaj jest całkowicie zależne od tego, co chcesz zrobić.
robthewolf

Jasne, ale w kolejce roboczej ta sama wiadomość (np. Ten sam identyfikator wiadomości) nie jest obsługiwana przez różnych konsumentów - jest to niejawnie okrężne. Znowu jest to prawdopodobnie moja wina, że ​​nie jestem wystarczająco jasny.
mikemaccana

wydaje się, że rozmawiamy tutaj w różnych celach.
robthewolf

Przepraszam za zamieszanie. Jeśli istnieje sposób na utworzenie kolejki roboczej, w której konsumenci w tej samej kolejce obsługują ten sam identyfikator wiadomości, wskaż mi odniesienie. W przeciwnym razie nadal będę wierzyć w to, co przeczytałem gdzie indziej.
mikemaccana


3

RabbitMQ / AMQP: pojedyncza kolejka, wielu odbiorców dla tej samej wiadomości i odświeżania strony.

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });

1

Aby uzyskać pożądane zachowanie, po prostu każ każdemu konsumentowi konsumować z własnej kolejki. Będziesz musiał użyć pośredniego typu wymiany (temat, nagłówek, fanout), aby wysłać wiadomość do wszystkich kolejek naraz.


1

Jak oceniam twój przypadek:

  • Mam kolejkę wiadomości (twoje źródło do otrzymywania wiadomości, nazwijmy to q111)

  • Mam wielu konsumentów, którym chciałbym robić różne rzeczy z tym samym przesłaniem.

Twój problem polega na tym, że podczas odbierania 3 wiadomości przez tę kolejkę, wiadomość 1 jest konsumowana przez konsumenta A, inni konsumenci B i C konsumują wiadomości 2 i 3. Gdzie potrzebujesz konfiguracji, w której rabbitmq przekazuje te same kopie wszystkie te trzy komunikaty (1, 2, 3) do wszystkich trzech podłączonych odbiorców (A, B, C) jednocześnie.

Chociaż można wykonać wiele konfiguracji, aby to osiągnąć, prostym sposobem jest zastosowanie następującej dwuetapowej koncepcji:

  • Użyj dynamicznej łopaty rabbitmq, aby odebrać wiadomości z żądanej kolejki (q111) i opublikować je na giełdzie fanoutów (giełda stworzona specjalnie do tego celu).
  • Teraz ponownie skonfiguruj swoich klientów A, B i C (którzy nasłuchiwali kolejki (q111)), aby nasłuchiwać z tej wymiany Fanout bezpośrednio przy użyciu ekskluzywnej i anonimowej kolejki dla każdego konsumenta.

Uwaga: Korzystając z tej koncepcji, nie konsumuj bezpośrednio z kolejki źródłowej (q111), ponieważ wiadomości, które już zostały skonsumowane, nie zostaną przeniesione na giełdę Fanout.

Jeśli uważasz, że to nie spełnia Twoich konkretnych wymagań ... napisz swoje sugestie :-)




-1

W tym scenariuszu jest jedna interesująca opcja, której nie znalazłem w odpowiedziach tutaj.

Możesz Nack wiadomości z funkcją „Requeue” w jednym odbiorcy, aby przetworzyć je w innym. Generalnie nie jest to dobry sposób, ale może komuś wystarczy.

https://www.rabbitmq.com/nack.html

I uważaj na pętle (gdy wszyscy klienci nack + requeue wiadomości)!


1
Zdecydowanie odradzam to, ponieważ nie skaluje się w żaden sposób. Nie ma zamówienia dla konsumentów, nie można zagwarantować konsumentowi B, który go nie zgłosi, otrzyma wiadomość przed konsumentem A, który go przetworzy i ponownie wyliczy, wspomniane pętle są problemem. Jak mówisz „to generalnie nie jest właściwa droga” i nie mogę wymyślić scenariusza, w którym to byłoby lepsze niż inne odpowiedzi.
Kevin Streicher
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.