Używam klienta Confluent.Kafka .NET w wersji 1.3.0. Postępuję zgodnie z dokumentami :
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
Problemem jest to, że nawet jeśli I ustosunkowania się linii consumer.StoreOffset(consumerResult);, ciśgle następnej wiadomości niewykorzystanymi następnym razem spożywać , czyli offset stale się powiększa, które nie wydają się być co twierdzi dokumentacja to robi, to znaczy co najmniej jedną dostawę .
Nawet jeśli zestaw I EnableAutoCommit = falsei usunąć „EnableAutoOffsetStore = false” z config i wymienić consumer.StoreOffset(consumerResult)z consumer.Commit(), wciąż widzę ten sam problem, czyli nawet jeśli I ustosunkowania się Commit, nadal trzymać się kolejne niewykorzystanymi wiadomości.
Czuję, że brakuje mi czegoś fundamentalnego, ale nie mogę zrozumieć, co. Każda pomoc jest mile widziana!
EnableAutoCommitjest ustawione. Powiedzmy, że mamy EnableAutoCommit = false, a kiedy ja Consumeodzyskuję wiadomość z przesunięciem 11. Spodziewałem się, że będę otrzymywał tę samą wiadomość z przesunięciem 11 w kółko, jeśli przetworzenie komunikatu będzie kontynuowane, a zatem nie Commitnastąpi wywołanie .
Consume), używając już Commitpo tym, jak już Subscribetemat. Kafka (jak w kliencie lib) za sceną zachowuje wszystkie przesunięcia, które wysłał do aplikacji w Consumei wyśle je liniowo. Aby więc ponownie przetworzyć wiadomość, tak jak w scenariuszu awarii, musisz ją wyśledzić w swoim kodzie i spróbować skompensować i rozpocząć przetwarzanie wiadomości, a także powinieneś wiedzieć, co pominąć, jeśli zostało już przetworzone we wcześniejszych żądaniach. Nie znam biblioteki .net, ale nie powinno to mieć większego znaczenia, ponieważ jest to projekt kafka.