Używam klienta Confluent.Kafka .NET w wersji 1.3.0. Postępuję zgodnie z dokumentami :
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
Problemem jest to, że nawet jeśli I ustosunkowania się linii consumer.StoreOffset(consumerResult);
, ciśgle następnej wiadomości niewykorzystanymi następnym razem spożywać , czyli offset stale się powiększa, które nie wydają się być co twierdzi dokumentacja to robi, to znaczy co najmniej jedną dostawę .
Nawet jeśli zestaw I EnableAutoCommit = false
i usunąć „EnableAutoOffsetStore = false” z config i wymienić consumer.StoreOffset(consumerResult)
z consumer.Commit()
, wciąż widzę ten sam problem, czyli nawet jeśli I ustosunkowania się Commit
, nadal trzymać się kolejne niewykorzystanymi wiadomości.
Czuję, że brakuje mi czegoś fundamentalnego, ale nie mogę zrozumieć, co. Każda pomoc jest mile widziana!
EnableAutoCommit
jest ustawione. Powiedzmy, że mamy EnableAutoCommit = false
, a kiedy ja Consume
odzyskuję wiadomość z przesunięciem 11. Spodziewałem się, że będę otrzymywał tę samą wiadomość z przesunięciem 11 w kółko, jeśli przetworzenie komunikatu będzie kontynuowane, a zatem nie Commit
nastąpi wywołanie .
Consume
), używając już Commit
po tym, jak już Subscribe
temat. Kafka (jak w kliencie lib) za sceną zachowuje wszystkie przesunięcia, które wysłał do aplikacji w Consume
i wyśle je liniowo. Aby więc ponownie przetworzyć wiadomość, tak jak w scenariuszu awarii, musisz ją wyśledzić w swoim kodzie i spróbować skompensować i rozpocząć przetwarzanie wiadomości, a także powinieneś wiedzieć, co pominąć, jeśli zostało już przetworzone we wcześniejszych żądaniach. Nie znam biblioteki .net, ale nie powinno to mieć większego znaczenia, ponieważ jest to projekt kafka.