Najlepszy sposób na przeniesienie wiadomości z DLQ w Amazon SQS?


87

Jaka jest najlepsza praktyka przenoszenia wiadomości z kolejki utraconych wiadomości z powrotem do oryginalnej kolejki w Amazon SQS?

Czy może być

  1. Uzyskaj wiadomość od DLQ
  2. Napisz wiadomość do kolejki
  3. Usuń wiadomość z DLQ

Czy jest prostszy sposób?

Ponadto, czy AWS będzie miało w końcu narzędzie w konsoli do przenoszenia wiadomości z DLQ?



także inny alternatywny github.com/mercury2269/sqsmover
Sergey

Odpowiedzi:


131

Oto szybki hack. To zdecydowanie nie jest najlepsza ani zalecana opcja.

  1. Ustaw główną kolejkę SQS jako DLQ dla rzeczywistego DLQ z maksymalną liczbą odebranych 1.
  2. Wyświetl zawartość w DLQ (Spowoduje to przeniesienie wiadomości do głównej kolejki, ponieważ jest to DLQ dla rzeczywistego DLQ)
  3. Usuń to ustawienie, aby kolejka główna nie była już DLQ rzeczywistego DLQ

12
Tak, to bardzo hack - ale fajna opcja na szybkie rozwiązanie, jeśli wiesz, co robisz i nie masz czasu, aby rozwiązać ten problem we właściwy sposób #yolo
Thomas Watson

14
Ale liczba odebranych nie jest resetowana do 0, gdy to zrobisz. Bądź ostrożny.
Rajdeep Siddhapura

1
Właściwym podejściem jest skonfigurowanie polityki Redrive w SQS z maksymalną liczbą odebranych wiadomości i automatycznie przeniesie wiadomość do DLQ, gdy przekroczy ustawioną liczbę odebranych, a następnie napisze wątek czytnika do odczytu z DLQ.
Ash

5
Jesteś geniuszem.
JefClaes

1
Stworzyłem narzędzie CLI do tego problemu kilka miesięcy temu: github.com/renanvieira/phoenix-letter
MaltMaster

14

Istnieje kilka skryptów, które robią to za Ciebie:

# install
npm install replay-aws-dlq;

# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source

# use
sqsmover -s [source_queue_url] -d [dest_queue_url] 

1
To najprostszy sposób, w przeciwieństwie do zaakceptowanej odpowiedzi. Po prostu uruchom to z terminala, który ma ustawioną właściwość AWS env vars:npx replay-aws-dlq DL_URI MAIN_URI
Vasyl Boroviak

Uwaga literówka: dql -> dlq # install npm install replay-aws-dlq;
Lee Oades,

To zadziałało dla mnie bez zarzutu (uwaga, próbowałem tylko tego opartego na go). Wydawało się, że przesuwał wiadomości etapami, a nie wszystkie naraz (dobrze), a nawet miał pasek postępu. Lepsza niż akceptowana odpowiedź IMO.
Jewgienij Ananin

13

Nie musisz przenosić wiadomości, ponieważ wiąże się ona z wieloma innymi wyzwaniami, takimi jak zduplikowane wiadomości, scenariusze odzyskiwania, utracona wiadomość, kontrola deduplikacji itp.

Oto rozwiązanie, które wdrożyliśmy -

Zwykle używamy DLQ do przejściowych błędów, a nie do trwałych błędów. Tak więc przyjęto poniżej podejście -

  1. Przeczytaj wiadomość z DLQ jak zwykłą kolejkę

    Korzyści
    • Aby uniknąć podwójnego przetwarzania wiadomości
    • Lepsza kontrola nad DLQ - tak jak umieściłem czek, aby przetwarzać tylko wtedy, gdy zwykła kolejka jest całkowicie przetworzona.
    • Skaluj proces na podstawie komunikatu w DLQ
  2. Następnie postępuj zgodnie z tym samym kodem, który podąża zwykła kolejka.

  3. Bardziej niezawodny w przypadku przerwania zadania lub przerwania procesu podczas przetwarzania (np. Zabicie instancji lub zakończenie procesu)

    Korzyści
    • Możliwość ponownego wykorzystania kodu
    • Obsługa błędów
    • Odzyskiwanie i ponowne odtwarzanie wiadomości
  4. Zwiększ widoczność wiadomości, aby żaden inny wątek ich nie przetwarzał.

    Zasiłek
    • Unikaj przetwarzania tego samego rekordu przez wiele wątków.
  5. Usuń wiadomość tylko w przypadku trwałego błędu lub pomyślnego zakończenia.

    Zasiłek
    • Kontynuuj przetwarzanie do momentu wystąpienia przejściowego błędu.

Bardzo podoba mi się twoje podejście! Jak w tym przypadku zdefiniujesz „trwały błąd”?
DMac the Destroyer,

Cokolwiek większe niż kod stanu HTTP> 200 <500 jest trwałym błędem
Ash

jest to rzeczywiście dobre podejście w produkcji. jednak myślę, że ten post zawiera po prostu pytanie, jak ponownie wysłać wiadomości z DLQ do normalnej kolejki. co czasami przydaje się, jeśli wiesz, co robisz.
linehrr

To właśnie mówię, że nie powinieneś tego robić. Ponieważ jeśli to zrobisz, spowoduje to więcej problemów. Możemy przenieść wiadomość jak każdą inną wiadomość push, ale stracimy funkcje DLQ, takie jak liczba odebranych, widoczność i wszystko. Będzie to traktowane jako nowa wiadomość.
Ash

6

To wygląda na najlepszą opcję. Istnieje możliwość, że proces zakończy się niepowodzeniem po kroku 2. W takim przypadku skopiujesz wiadomość dwukrotnie, ale aplikacja powinna obsługiwać ponowne dostarczanie wiadomości (lub nie przejmować się tym).


6

tutaj:

import boto3
import sys
import Queue
import threading

work_queue = Queue.Queue()

sqs = boto3.resource('sqs')

from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)

def process_queue():
    while True:
        messages = work_queue.get()

        bodies = list()
        for i in range(0, len(messages)):
            bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})

        to_q.send_messages(Entries=bodies)

        for message in messages:
            print("Coppied " + str(message.body))
            message.delete()

for i in range(10):
     t = threading.Thread(target=process_queue)
     t.daemon = True
     t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()

Czy to jest Python?
carlin.scott

właściwie python2
Kristof Jozsa

4

Jest inny sposób na osiągnięcie tego bez pisania jednej linii kodu. Rozważmy, że rzeczywista nazwa kolejki to SQS_Queue, a DLQ dla niej to SQS_DLQ. Teraz wykonaj następujące kroki:

  1. Ustaw SQS_Queue jako dlq z SQS_DLQ. Ponieważ SQS_DLQ jest już dlq SQS_Queue. Teraz oba działają jako DLQ drugiego.
  2. Ustaw maksymalną liczbę odebranych SQS_DLQ na 1.
  3. Teraz przeczytaj wiadomości z konsoli SQS_DLQ. Ponieważ liczba odebranych wiadomości wynosi 1, wszystkie wiadomości zostaną przesłane do własnego serwera DLQ, który jest rzeczywistą kolejką SQS_Queue.

To zniweczy cel utrzymywania DLQ. DLQ ma na celu zapobieganie przeładowaniu systemu podczas obserwowania awarii, aby można było to zrobić później.
Buddha

Z pewnością pokona cel i nie będziesz w stanie osiągnąć innych korzyści, takich jak skalowanie w górę, dławienie i licznik otrzymywania. Co więcej, jako kolejkę przetwarzania należy użyć zwykłej kolejki i jeśli liczba odebranych wiadomości osiągnie „N”, powinna trafić do DLQ. Tak powinno być idealnie.
Ash

3
Jako jednorazowe rozwiązanie do ponownego przesłania wielu wiadomości, działa to jak urok. Nie jest to jednak dobre długoterminowe rozwiązanie.
nmio

Tak, jest to niezwykle cenne jako jednorazowe rozwiązanie do przekierowania wiadomości (po naprawieniu problemu w głównej kolejce). Na AWS CLI polecenie użyłem to: aws sqs receive-message --queue-url <url of DLQ> --max-number-of-messages 10. Ponieważ maksymalna liczba wiadomości, które można odczytać na 10, sugeruję uruchomienie polecenia w takiej pętli:for i in {1..1000}; do <CMD>; done
Patrick Finnigan

3

Napisałem mały skrypt w Pythonie, aby to zrobić, używając boto3 lib:

conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break

możesz pobrać ten skrypt w tym linku

ten skrypt w zasadzie może przenosić wiadomości między dowolnymi kolejkami. i obsługuje kolejki FIFO, jak również możesz podać message_group_idpole.


3

Używamy następującego skryptu, aby przekierować wiadomość z kolejki src do kolejki tgt:

Nazwa pliku: redrive.py

stosowanie: python redrive.py -s {source queue name} -t {target queue name}

'''
This script is used to redrive message in (src) queue to (tgt) queue

The solution is to set the Target Queue as the Source Queue's Dead Letter Queue.
Also set Source Queue's redrive policy, Maximum Receives to 1. 
Also set Source Queue's VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.

Source Queue's Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', '--src', required=True,
                        help='Name of source SQS')
    parser.add_argument('-t', '--tgt', required=True,
                        help='Name of targeted SQS')

    args = parser.parse_args()
    return args


def verify_queue(queue_name):
    queue_url = sqs.get_queue_url(QueueName=queue_name)
    return True if queue_url.get('QueueUrl') else False


def get_queue_attribute(queue_url):
    queue_attributes = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=['All'])['Attributes']
    print(queue_attributes)

    return queue_attributes


def main():
    args = parse_args()
    for q in [args.src, args.tgt]:
        if not verify_queue(q):
            print(f"Cannot find {q} in AWS SQS")

    src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']

    target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
    target_queue_attributes = get_queue_attribute(target_queue_url)

    # Set the Source Queue's Redrive policy
    redrive_policy = {
        'deadLetterTargetArn': target_queue_attributes['QueueArn'],
        'maxReceiveCount': '1'
    }
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '5',
            'RedrivePolicy': json.dumps(redrive_policy)
        }
    )
    get_queue_attribute(src_queue_url)

    # read all messages
    num_received = 0
    while True:
        try:
            resp = sqs.receive_message(
                QueueUrl=src_queue_url,
                MaxNumberOfMessages=10,
                AttributeNames=['All'],
                WaitTimeSeconds=5)

            num_message = len(resp.get('Messages', []))
            if not num_message:
                break

            num_received += num_message
        except Exception:
            break
    print(f"Redrive {num_received} messages")

    # Reset the Source Queue's Redrive policy
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '30',
            'RedrivePolicy': ''
        }
    )
    get_queue_attribute(src_queue_url)


if __name__ == "__main__":
    main()

0

DLQ wchodzi w grę tylko wtedy, gdy pierwotny konsument nie może pomyślnie odebrać wiadomości po różnych próbach. Nie chcemy usuwać wiadomości, ponieważ uważamy, że nadal możemy coś z nią zrobić (może spróbować ponownie przetworzyć ją, zarejestrować lub zebrać jakieś statystyki) i nie chcemy ciągle napotykać tej wiadomości i zatrzymać możliwość przetwarzać inne wiadomości za tym.

DLQ to nic innego jak kolejka. Co oznacza, że ​​musielibyśmy napisać konsumenta dla DLQ, który idealnie działałby rzadziej (w porównaniu z oryginalną kolejką), który zużywałby z DLQ i tworzył komunikat z powrotem do oryginalnej kolejki i usuwał go z DLQ - jeśli takie jest zamierzone zachowanie i uważamy pierwotny konsument byłby teraz gotowy do ponownego przetworzenia. Powinno być OK, jeśli ten cykl będzie trwał przez jakiś czas, ponieważ teraz mamy również możliwość ręcznego sprawdzenia i wprowadzenia niezbędnych zmian oraz wdrożenia kolejnej wersji oryginalnego konsumenta bez utraty wiadomości (oczywiście w okresie przechowywania wiadomości - czyli 4 dni do domyślna).

Byłoby miło, gdyby AWS zapewniał tę możliwość od razu po wyjęciu z pudełka, ale jeszcze tego nie widzę - pozostawiają to użytkownikowi końcowemu, aby używał go w sposób, który czuje się odpowiedni.

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.