Nie myśl, że jest jeszcze obsługiwany. Spójrz na to wydanie JIRA „Dodaj obsługę usuwania tematów”.
Aby usunąć ręcznie:
- Zamknij klaster
- Wyczyść katalog dziennika kafka (określony przez
log.dir
atrybut w pliku konfiguracyjnym kafka ) oraz dane zookeepera
- Uruchom ponownie klaster
W przypadku każdego tematu możesz zrobić
- Przestań kafka
- Wyczyść dziennik kafka specyficzny dla partycji, kafka przechowuje swój plik dziennika w formacie „logDir / topic-partition”, więc dla tematu o nazwie „MyTopic” dziennik partycji o identyfikatorze 0 będzie przechowywany w
/tmp/kafka-logs/MyTopic-0
miejscu /tmp/kafka-logs
określonym przez log.dir
atrybut
- Uruchom ponownie kafka
To jest NOT
dobre i zalecane podejście, ale powinno działać. W pliku konfiguracyjnym brokera Kafka log.retention.hours.per.topic
atrybut jest używany do definiowaniaThe number of hours to keep a log file before deleting it for some specific topic
Czy istnieje również sposób, w jaki wiadomości są usuwane, gdy tylko konsument je przeczyta?
Z dokumentacji Kafki :
Klaster Kafka zachowuje wszystkie opublikowane wiadomości - niezależnie od tego, czy zostały zużyte - przez konfigurowalny okres czasu. Na przykład, jeśli czas przechowywania dziennika jest ustawiony na dwa dni, to przez dwa dni po opublikowaniu wiadomości jest on dostępny do użytku, po czym zostanie wyrzucony w celu zwolnienia miejsca. Wydajność Kafki jest w rzeczywistości stała w odniesieniu do rozmiaru danych, więc przechowywanie dużej ilości danych nie stanowi problemu.
W rzeczywistości jedynymi metadanymi zachowanymi dla każdego konsumenta jest pozycja konsumenta w dzienniku, zwana „przesunięciem”. To przesunięcie jest kontrolowane przez konsumenta: zwykle konsument przesuwa swoje przesunięcie liniowo podczas odczytywania wiadomości, ale w rzeczywistości pozycja jest kontrolowana przez konsumenta i może konsumować wiadomości w dowolnej kolejności. Na przykład konsument może zresetować do starszego przesunięcia w celu ponownego przetworzenia.
Mówią, że za znalezienie przesunięcia początkowego do przeczytania w przykładzie Prostego konsumenta Kafki 0.8
Kafka zawiera dwie stałe, które mogą pomóc, kafka.api.OffsetRequest.EarliestTime()
znajduje początek danych w dziennikach i rozpoczyna strumieniowanie od tego miejsca, kafka.api.OffsetRequest.LatestTime()
będzie przesyłał strumieniowo tylko nowe wiadomości.
Możesz tam również znaleźć przykładowy kod do zarządzania przesunięciem po stronie klienta.
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}