Czy istnieje sposób na usunięcie wszystkich danych z tematu lub usunięcie tematu przed każdym uruchomieniem?


87

Czy istnieje sposób na usunięcie wszystkich danych z tematu lub usunięcie tematu przed każdym uruchomieniem?

Czy mogę zmodyfikować plik KafkaConfig.scala, aby zmienić logRetentionHourswłaściwość? Czy istnieje sposób, w jaki wiadomości są usuwane, gdy tylko konsument je przeczyta?

Używam producentów do pobierania danych skądś i wysyłania danych do określonego tematu, w którym konsument konsumuje, czy mogę usunąć wszystkie dane z tego tematu przy każdym uruchomieniu? Chcę tylko nowych danych za każdym razem w temacie. Czy jest sposób na ponowne zainicjowanie tematu?


Odpowiedzi:


62

Nie myśl, że jest jeszcze obsługiwany. Spójrz na to wydanie JIRA „Dodaj obsługę usuwania tematów”.

Aby usunąć ręcznie:

  1. Zamknij klaster
  2. Wyczyść katalog dziennika kafka (określony przez log.diratrybut w pliku konfiguracyjnym kafka ) oraz dane zookeepera
  3. Uruchom ponownie klaster

W przypadku każdego tematu możesz zrobić

  1. Przestań kafka
  2. Wyczyść dziennik kafka specyficzny dla partycji, kafka przechowuje swój plik dziennika w formacie „logDir / topic-partition”, więc dla tematu o nazwie „MyTopic” dziennik partycji o identyfikatorze 0 będzie przechowywany w /tmp/kafka-logs/MyTopic-0miejscu /tmp/kafka-logsokreślonym przez log.diratrybut
  3. Uruchom ponownie kafka

To jest NOTdobre i zalecane podejście, ale powinno działać. W pliku konfiguracyjnym brokera Kafka log.retention.hours.per.topicatrybut jest używany do definiowaniaThe number of hours to keep a log file before deleting it for some specific topic

Czy istnieje również sposób, w jaki wiadomości są usuwane, gdy tylko konsument je przeczyta?

Z dokumentacji Kafki :

Klaster Kafka zachowuje wszystkie opublikowane wiadomości - niezależnie od tego, czy zostały zużyte - przez konfigurowalny okres czasu. Na przykład, jeśli czas przechowywania dziennika jest ustawiony na dwa dni, to przez dwa dni po opublikowaniu wiadomości jest on dostępny do użytku, po czym zostanie wyrzucony w celu zwolnienia miejsca. Wydajność Kafki jest w rzeczywistości stała w odniesieniu do rozmiaru danych, więc przechowywanie dużej ilości danych nie stanowi problemu.

W rzeczywistości jedynymi metadanymi zachowanymi dla każdego konsumenta jest pozycja konsumenta w dzienniku, zwana „przesunięciem”. To przesunięcie jest kontrolowane przez konsumenta: zwykle konsument przesuwa swoje przesunięcie liniowo podczas odczytywania wiadomości, ale w rzeczywistości pozycja jest kontrolowana przez konsumenta i może konsumować wiadomości w dowolnej kolejności. Na przykład konsument może zresetować do starszego przesunięcia w celu ponownego przetworzenia.

Mówią, że za znalezienie przesunięcia początkowego do przeczytania w przykładzie Prostego konsumenta Kafki 0.8

Kafka zawiera dwie stałe, które mogą pomóc, kafka.api.OffsetRequest.EarliestTime()znajduje początek danych w dziennikach i rozpoczyna strumieniowanie od tego miejsca, kafka.api.OffsetRequest.LatestTime()będzie przesyłał strumieniowo tylko nowe wiadomości.

Możesz tam również znaleźć przykładowy kod do zarządzania przesunięciem po stronie klienta.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}

Uważam, że poprawny link do problemu z JIRA to Issues.apache.org/jira/browse/KAFKA-330
asmaier

4
Temat nadal będzie się tutaj pojawiał, ponieważ jest wymieniony w zookeeper. Będziesz musiał rekurencyjnie usunąć wszystko poniżej, brokers/topics/<topic_to_delete>a także dzienniki, aby się go pozbyć.
Zgłoszony

3
Zgodnie z linkiem do wydania możesz usunąć temat po wersji 0.8.1. Szczegółową pomoc można wyświetlić pod adresem kafka-run-class.sh kafka.admin.DeleteTopicCommand.
Jay

5
Aktualizacja: od kafka 0.8.2 polecenie zostało zmienione na:kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181
Jay Taylor

Myślę, że ta funkcja włączania usuwania tematu została teraz dodana. Prawdopodobnie będzie to miało następne stabilne wydanie.
ha9u63ar

70

Jak wspomniałem tutaj Oczyść kolejkę Kafki :

Testowane w Kafka 0.8.2, dla przykładu szybkiego startu: Najpierw dodaj jedną linię do pliku server.properties w folderze konfiguracyjnym:

delete.topic.enable=true

następnie możesz uruchomić to polecenie:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

2
Przy okazji, po dodaniu opcji nie musisz ponownie uruchamiać serwera Kafka, na wypadek gdyby ktoś się zastanawiał.
oficer problemowy

14

Testowany z kafką 0.10

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

Uwaga: jeśli usuwasz folder / y tematów wewnątrz dzienników kafka, ale nie z folderu zookeeper-data, zobaczysz, że tematy nadal tam są.


8

Jako brudne obejście można dostosować ustawienia przechowywania w czasie wykonywania dla poszczególnych tematów, np. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1( Może również działać retention.bytes = 0 )

Po chwili kafka powinna zwolnić miejsce. Nie jestem pewien, czy ma to jakieś konsekwencje w porównaniu z ponownym utworzeniem tematu.

ps. Lepiej przywróć ustawienia retencji, gdy kafka zakończy czyszczenie.

Możesz również użyć retention.msdo utrwalenia danych historycznych


8

Poniżej znajdują się skrypty do opróżniania i usuwania tematu Kafki, przyjmując localhost jako serwer zookeeper, a Kafka_Home jest ustawiony na katalog instalacyjny:

Poniższy skrypt opróżni temat, ustawiając jego czas przechowywania na 1 sekundę, a następnie usuwając konfigurację:

#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

Aby całkowicie usunąć tematy, należy zatrzymać wszystkie odpowiednie brokery kafka i usunąć jego katalog (y) z katalogu dziennika kafka (domyślnie: / tmp / kafka-logs), a następnie uruchomić ten skrypt, aby usunąć temat z zookeepera. Aby sprawdzić, czy został usunięty z zookeepera, dane wyjściowe ls / brokers / topics nie powinny już zawierać tematu:

#!/bin/bash
echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF

1
To zadziała tylko wtedy, gdy kontrola retencji nastąpi w ciągu tych 5 sekund snu. Upewnij się, że śpisz, dopóki czek nie przejdzie definitywnie, jak określono tutaj:grep "log.retention.check.interval" $Kafka_Home/config/server.properties
colin

2
Chciałem edytować odpowiedź, ponieważ w pierwszym poleceniu jest mały błąd. Ale edycje jednego znaku nie są dozwolone. Właściwie to --add configraczej nie jest--add-config
SRC

7

Wypróbowaliśmy mniej więcej to, co opisują inne odpowiedzi, z umiarkowanym poziomem sukcesu. To, co naprawdę zadziałało dla nas (Apache Kafka 0.8.1), to polecenie klasy

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost: 2181


2
Próbowałem tego w 0.8.1. Polecenie zwraca „usunięcie powiodło się!” jednak nie usuwa partycji w folderach dziennika.
dilm

8
Próbowałem na 0.8.2.1 (homebrew) i daje ten błąd. Error: Could not find or load main class kafka.admin.DeleteTopicCommand
Thanish,

2
Od nowej kafki (0.8.2) jest to sh kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_for_delete] --zookeeper localhost: 2181. Upewnij się, że parametr delete.topic.enable ma wartość true.
Hoàng Long

3

Dla użytkowników piwa

Jeśli używasz czegoś brewtakiego jak ja i tracisz dużo czasu na szukanie niesławnego kafka-logsfolderu, nie bój się więcej. (i daj mi znać, jeśli to działa dla Ciebie i wielu różnych wersji Homebrew, Kafka itp. :))

Prawdopodobnie znajdziesz to pod:

Lokalizacja:

/usr/local/var/lib/kafka-logs


Jak właściwie znaleźć tę ścieżkę

(jest to również przydatne w zasadzie dla każdej aplikacji instalowanej przez brew)

1) brew services list

kafka rozpoczął matbhz /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist

2) Otwórz i przeczytaj, plistże znalazłeś powyżej

3) Znajdź linię określającą server.propertieslokalizację otwórz ją, w moim przypadku:

  • /usr/local/etc/kafka/server.properties

4) Poszukaj log.dirslinii:

log.dirs = / usr / local / var / lib / kafka-logs

5) Przejdź do tej lokalizacji i usuń dzienniki dotyczące żądanych tematów

6) Zrestartuj Kafkę za pomocą brew services restart kafka


2

Wszystkie dane dotyczące tematów i ich partycji są przechowywane w plikach tmp/kafka-logs/. Ponadto są przechowywane w formacie topic-partionNumber, więc jeśli chcesz usunąć temat newTopic, możesz:

  • przestań kafka
  • usuń pliki rm -rf /tmp/kafka-logs/newTopic-*

1
  1. Zatrzymaj ZooKeeper i Kafka
  2. W pliku server.properties zmień wartość log.retention.hours. Możesz komentować log.retention.hoursi dodawać log.retention.ms=1000. To zachowałoby rekord Kafki Topic tylko przez jedną sekundę.
  3. Uruchom zookeeper i kafka.
  4. Sprawdź w konsoli klienta. Kiedy otworzyłem konsolę po raz pierwszy, był tam rekord. Ale kiedy ponownie otworzyłem konsolę, płyta została usunięta.
  5. Później możesz ustawić wartość log.retention.hoursna żądaną figurę.

1

Od wersji kafka 2.3.0 istnieje alternatywny sposób miękkiego usuwania Kafki (stare podejście jest przestarzałe).

Zaktualizuj retention.ms do 1 sekundy (1000 ms), a następnie ustaw ponownie po minucie, do ustawienia domyślnego, tj. 7 dni (168 godzin, 604 800 000 w ms)

Miękkie usuwanie: - (rentention.ms = 1000) (przy użyciu kafka-configs.sh)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.

Ustawienie domyślne: - 7 dni (168 godzin, retention.ms = 604800000)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=604800000


0

Używam tego skryptu:

#!/bin/bash
topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
    done
done
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
    done
done

0

Używam poniższego narzędzia do czyszczenia po uruchomieniu testu integracji.

Korzysta z najnowszego AdminZkClientinterfejsu API. Starszy interfejs API został wycofany.

import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time

class ZookeeperUtils @Inject() (config: AppConfig) {

  val testTopic = "users_1"

  val zkHost = config.KafkaConfig.zkHost
  val sessionTimeoutMs = 10 * 1000
  val connectionTimeoutMs = 60 * 1000
  val isSecure = false
  val maxInFlightRequests = 10
  val time: Time = Time.SYSTEM

  def cleanupTopic(config: AppConfig) = {

    val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
    val zkUtils = new AdminZkClient(zkClient)

    val pp = new Properties()
    pp.setProperty("delete.retention.ms", "10")
    pp.setProperty("file.delete.delay.ms", "1000")
    zkUtils.changeTopicConfig(testTopic , pp)
    //    zkUtils.deleteTopic(testTopic)

    println("Waiting for topic to be purged. Then reset to retain records for the run")
    Thread.sleep(60000L)

    val resetProps = new Properties()
    resetProps.setProperty("delete.retention.ms", "3000000")
    resetProps.setProperty("file.delete.delay.ms", "4000000")
    zkUtils.changeTopicConfig(testTopic , resetProps)

  }


}

Istnieje opcja usunięcia tematu. Ale oznacza temat do usunięcia. Zookeeper później usuwa temat. Ponieważ może to być nieprzewidywalnie długie, wolę podejście retention.ms

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.