Użyj ponownie tego samego komunikatu, jeśli przetwarzanie komunikatu nie powiedzie się


10

Używam klienta Confluent.Kafka .NET w wersji 1.3.0. Postępuję zgodnie z dokumentami :

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

Problemem jest to, że nawet jeśli I ustosunkowania się linii consumer.StoreOffset(consumerResult);, ciśgle następnej wiadomości niewykorzystanymi następnym razem spożywać , czyli offset stale się powiększa, które nie wydają się być co twierdzi dokumentacja to robi, to znaczy co najmniej jedną dostawę .

Nawet jeśli zestaw I EnableAutoCommit = falsei usunąć „EnableAutoOffsetStore = false” z config i wymienić consumer.StoreOffset(consumerResult)z consumer.Commit(), wciąż widzę ten sam problem, czyli nawet jeśli I ustosunkowania się Commit, nadal trzymać się kolejne niewykorzystanymi wiadomości.

Czuję, że brakuje mi czegoś fundamentalnego, ale nie mogę zrozumieć, co. Każda pomoc jest mile widziana!


Wiadomości zostały już zwrócone do aplikacji z punktu widzenia Kafki, więc po zatwierdzeniu są one zapisywane jako ostatnie zatwierdzone przesunięcia, ale konsumpcja będzie nadal zwracać kolejne wiadomości, niezależnie od tego, czy zostały zużyte, czy nie. Jakie są twoje oczekiwania tutaj? Czy mógłbyś wyjaśnić, czego się spodziewasz przed / po zatwierdzeniu i konsumpcji?
Sagar Veeram

Wiadomości nie są ponownie pobierane, dopóki nie użyjesz polecenia „szukaj”, aby je skompensować. Spowoduje to zużycie i wiadomości zostaną zwrócone z wyszukiwania offset.
Sagar Veeram

@ user2683814 W moim poście wspomniałem o dwóch scenariuszach w zależności od tego, co EnableAutoCommitjest ustawione. Powiedzmy, że mamy EnableAutoCommit = false, a kiedy ja Consumeodzyskuję wiadomość z przesunięciem 11. Spodziewałem się, że będę otrzymywał tę samą wiadomość z przesunięciem 11 w kółko, jeśli przetworzenie komunikatu będzie kontynuowane, a zatem nie Commitnastąpi wywołanie .
havij

Nie, tak nie jest. Nie możesz kontrolować, co chcesz odpytać ( Consume), używając już Commitpo tym, jak już Subscribetemat. Kafka (jak w kliencie lib) za sceną zachowuje wszystkie przesunięcia, które wysłał do aplikacji w Consumei wyśle ​​je liniowo. Aby więc ponownie przetworzyć wiadomość, tak jak w scenariuszu awarii, musisz ją wyśledzić w swoim kodzie i spróbować skompensować i rozpocząć przetwarzanie wiadomości, a także powinieneś wiedzieć, co pominąć, jeśli zostało już przetworzone we wcześniejszych żądaniach. Nie znam biblioteki .net, ale nie powinno to mieć większego znaczenia, ponieważ jest to projekt kafka.
Sagar Veeram

Myślę, że musisz użyć kombinacji subskrypcji i przypisania i może potrzebować różnych klientów do obsługi twojego przypadku użycia. W przypadku awarii skorzystaj z funkcji przypisania / wyszukiwania dla partycji tematycznych z jednym konsumentem w celu ponownego przetworzenia wiadomości, a do normalnego przetwarzania użyj innego konsumenta z przepływem subskrypcji / konsumpcji / zatwierdzania.
Sagar Veeram

Odpowiedzi:



0

Możesz chcieć mieć logikę ponownej próby przetwarzania każdej wiadomości przez określoną liczbę razy, np. Powiedzmy 5. Jeśli to się nie powiedzie podczas tych 5 ponownych prób, możesz dodać tę wiadomość do innego tematu dotyczącego obsługi wszystkich nieudane wiadomości, które mają pierwszeństwo przed rzeczywistym tematem. Lub możesz dodać wiadomość, która się nie powiodła, do tego samego tematu, aby mogła zostać odebrana później, gdy wszystkie pozostałe wiadomości zostaną zużyte.

Jeśli przetwarzanie któregokolwiek komunikatu zakończy się powodzeniem w ciągu tych 5 ponownych prób, możesz przejść do następnej wiadomości w kolejce.

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.