Oczyść temat Kafka


185

Czy istnieje sposób na oczyszczenie tematu w kafce?

Przesunąłem wiadomość, która była zbyt duża, do tematu wiadomości Kafka na mojej lokalnej maszynie, teraz pojawia się błąd:

kafka.common.InvalidMessageSizeException: invalid message size

Zwiększanie liczby fetch.sizenie jest tutaj idealne, ponieważ tak naprawdę nie chcę akceptować tak dużych wiadomości.

Odpowiedzi:


359

Tymczasowo zaktualizuj czas przechowywania w temacie do jednej sekundy:

kafka-topics.sh --zookeeper <zkhost>:2181 --alter --topic <topic name> --config retention.ms=1000

W nowszych wersjach Kafki możesz to również zrobić kafka-configs --entity-type topics

kafka-configs.sh --zookeeper <zkhost>:2181 --entity-type topics --alter --entity-name <topic name> --add-config retention.ms=1000

następnie poczekaj, aż oczyszczanie zacznie działać (około minuty). Po wyczyszczeniu przywróć poprzednią retention.mswartość.


8
To świetna odpowiedź, ale czy możesz dodać opis, jak zacząć od sprawdzenia aktualnej wartości retention.ms tematu?
Greg Dubicki

28
Nie jestem pewien, czy sprawdzę aktualną konfigurację, ale wierzę, że przywrócenie jej do wartości domyślnych wygląda następująco:bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
aspergillusOryzae

15
Lub w zależności od wersji:--delete-config retention.ms
aspergillusOryzae

3
tylko fyi, dla Kafka v. 0.9.0.0, mówi: ubuntu @ ip-172-31-21-201: /opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin / kafka-topics.sh - -zookeeper localhost: 2181 --alter --topic room-data --config retention.ms = 1000 OSTRZEŻENIE: Zmiana konfiguracji tematu z tego skryptu jest nieaktualna i może zostać usunięta w przyszłych wydaniach. Idąc dalej, skorzystaj z kafka-configs.sh dla tej funkcjonalności
Alper Akture

54
Wydaje się, że od wersji 0.9.0 używanie pliku kafka-topics.sh w celu zmiany konfiguracji jest przestarzałe. Nowa opcja polega na użyciu skryptu kafka-configs.sh. e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 Pozwala to również sprawdzić bieżący okres przechowywania, np. Kafka-configs --zookeeper <zkhost>: 2181 --describe - tematy typu -entity - nazwa-name <nazwa tematu>
RHE

70

Aby wyczyścić kolejkę, możesz usunąć temat:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

następnie utwórz go ponownie:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test

14
Pamiętaj, aby dodać wiersz delete.topic.enable=truew pliku config/server.properties, jak mówi ostrzeżenie wydrukowane przez wspomniane polecenieNote: This will have no impact if delete.topic.enable is not set to true.
Patrizio Bertoni

3
Nie zawsze jest to natychmiastowe. Czasami oznacza to po prostu usunięcie, a faktyczne usunięcie nastąpi później.
Gaurav Khare

48

Oto kroki, które wykonuję, aby usunąć temat o nazwie MyTopic:

  1. Opisz temat i nie bierz identyfikatorów brokera
  2. Zatrzymaj demona Apache Kafka dla każdego identyfikatora brokera na liście.
  3. Połącz się z każdym brokerem i usuń folder danych tematu, np rm -rf /tmp/kafka-logs/MyTopic-0. Powtórz dla innych partycji i wszystkich replik
  4. Usuń metadane tematu: zkCli.shnastępniermr /brokers/MyTopic
  5. Uruchom demona Apache Kafka dla każdego zatrzymanego komputera

Jeśli przegapisz krok 3, to Apache Kafka będzie nadal zgłaszać ten temat jako obecny (na przykład, jeśli uruchomisz kafka-list-topic.sh).

Testowane z Apache Kafka 0.8.0.


2
w ./zookeeper-shell.sh localhost:2181./kafka-topics.sh --list --zookeeper localhost:2181
0.8.1

Można użyć zookeeper-clientzamiast zkCli.sh(wypróbowany na Cloudera CDH5)
Martin Tapp

1
Spowoduje to usunięcie tematu, a nie zawartych w nim danych. Wymaga to zatrzymania Brokera. To w najlepszym razie hack. Odpowiedź Stevena Appleyarda jest naprawdę najlepsza.
Jeff Maass

1
To był jedyny sposób, w którym został napisany.
Thomas Bratt

2
Pracowałem dla mnie nad Kafką 0.8.2.1, chociaż topis w zookeeper był pod / brokers / topics / <nazwa tematu tutaj>
codecraig

44

Chociaż przyjęta odpowiedź jest poprawna, metoda ta jest przestarzała. Konfiguracja tematu powinna być teraz wykonana za pośrednictwem kafka-configs.

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

Konfiguracje ustawione za pomocą tej metody można wyświetlić za pomocą polecenia

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic

2
Warto również dodać:kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
NoBrainer

38

Testowany w Kafka 0.8.2, na przykład szybkiego uruchamiania: Najpierw dodaj jedną linię do pliku server.properties w folderze config:

delete.topic.enable=true

następnie możesz uruchomić to polecenie:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

6

Od kafka 1.1

Wyczyść temat

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100

poczekaj 1 minutę, aby być pewnym, że kafka wyczyści temat, usuń konfigurację, a następnie przejdź do wartości domyślnej

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms

1
Myślę, że masz dodatkową strzałę. W moim przypadku mogłem biecbin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
Will

4

kafka nie ma bezpośredniej metody czyszczenia / czyszczenia tematu (kolejek), ale może to zrobić poprzez usunięcie tego tematu i odtworzenie go.

najpierw upewnij się, że plik sever.properties ma, a jeśli nie, dodaj delete.topic.enable=true

następnie Usuń temat bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

następnie utwórz go ponownie.

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2

4

Czasami, jeśli masz nasycony klaster (zbyt wiele partycji, używasz zaszyfrowanych danych tematów lub używasz protokołu SSL lub kontroler znajduje się w złym węźle lub połączenie jest niestabilne, wyczyszczenie tego tematu zajmie dużo czasu .

Wykonuję te kroki, szczególnie jeśli używasz Avro.

1: Uruchom z narzędziami Kafka:

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2: Uruchom w węźle rejestru Schemat:

kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3: Przywróć zachowanie tematu do pierwotnego ustawienia, gdy temat będzie pusty.

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

Mam nadzieję, że to komuś pomoże, ponieważ nie jest łatwo reklamowane.


Uwaga: kafka-avro-console-consumernie jest konieczne
OneCricketeer

4

AKTUALIZACJA: Ta odpowiedź dotyczy Kafka 0.6. Dla Kafki 0.8 i późniejszych patrz odpowiedź @Patrick.

Tak, zatrzymaj Kafka i ręcznie usuń wszystkie pliki z odpowiedniego podkatalogu (łatwo je znaleźć w katalogu danych Kafka). Po ponownym uruchomieniu Kafka temat będzie pusty.


Wymaga to obalenia Brokera, a co najwyżej włamania. Odpowiedź Stevena Appleyarda jest naprawdę najlepsza.
Jeff Maass

@MaasSql Zgadzam się. :) Ta odpowiedź ma dwa lata, około wersji 0.6. Funkcje „zmień temat” i „usuń temat” zostały zaimplementowane później.
Wildfire

Odpowiedź Stevena Appleyarda jest równie zuchwała, jak ta.
Banjocat

Posługiwanie się uchwytem aplikacji przez usuwanie własnych danych w obsługiwany sposób jest o wiele mniej kłopotliwe niż wyłączenie tej aplikacji i usunięcie, jak myślisz, wszystkich plików danych, a następnie włączenie jej z powrotem.
Nick

3

Najprostszym podejściem jest ustawienie daty poszczególnych plików dziennika na starsze niż okres przechowywania. Następnie broker powinien je wyczyścić i usunąć dla Ciebie w ciągu kilku sekund. Ma to kilka zalet:

  1. Nie ma potrzeby wyłączania brokerów, to operacja uruchomieniowa.
  2. Zapobiega możliwości nieprawidłowych wyjątków przesunięcia (więcej na ten temat poniżej).

Z mojego doświadczenia z Kafka 0.7.x usunięcie plików dziennika i ponowne uruchomienie brokera może prowadzić do nieprawidłowych wyjątków przesunięcia dla niektórych konsumentów. Stałoby się tak, ponieważ broker ponownie uruchamia przesunięcia na zero (przy braku istniejących plików dziennika), a konsument, który wcześniej konsumował z tematu, połączyłby się ponownie, aby zażądać określonego [raz poprawnego] przesunięcia. Jeśli to przesunięcie nie mieści się w granicach nowych dzienników tematów, nie ma żadnych szkód, a konsument wznawia na początku lub na końcu. Ale jeśli przesunięcie mieści się w granicach nowych dzienników tematów, broker próbuje pobrać zestaw komunikatów, ale kończy się niepowodzeniem, ponieważ przesunięcie nie jest wyrównane z rzeczywistym komunikatem.

Można to złagodzić, usuwając również przesunięcia konsumentów w zookeeper dla tego tematu. Ale jeśli nie potrzebujesz dziewiczego tematu i chcesz po prostu usunąć istniejącą zawartość, po prostu „dotknięcie” kilku dzienników tematów jest o wiele łatwiejsze i bardziej niezawodne niż zatrzymywanie brokerów, usuwanie dzienników tematów i czyszczenie niektórych węzłów dozorcy .


jak „ustawić datę poszczególnych plików dziennika, aby były starsze niż okres przechowywania”? dzięki
bylijinnan

3

Rada Thomasa jest świetna, ale niestety zkCliw starych wersjach Zookeepera (na przykład 3.3.6) nie wydaje się wspierać rmr. Na przykład porównaj implementację wiersza poleceń we współczesnym Zookeeperze z wersją 3.3 .

Jeśli masz do czynienia ze starą wersją Zookeepera, jednym z rozwiązań jest użycie biblioteki klienta takiej jak zc.zk dla Pythona. Dla osób, które nie znają Pythona, musisz zainstalować go za pomocą pip lub easy_install . Następnie uruchom powłokę Pythona ( python) i możesz:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

lub nawet

zk.delete_recursive('brokers')

jeśli chcesz usunąć wszystkie tematy z Kafki.


2

Aby wyczyścić wszystkie wiadomości z określonego tematu przy użyciu grupy aplikacji (nazwa_grupy powinna być taka sama jak nazwa grupy aplikacji kafka).

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group


Istnieje problem z tym podejściem (testowany w 0.8.1.1). Jeśli aplikacja zasubskrybuje dwa (lub więcej) tematów: temat1 i temat2, a konsument konsoli wyczyści temat1, niestety usuwa również niepowiązane przesunięcie konsumenta dla tematu2, co powoduje odtworzenie wszystkich wiadomości z tematu2.
jsh

2

Po odpowiedzi na @steven appleyard wykonałem następujące polecenia na Kafce 2.2.0 i zadziałały dla mnie.

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms

To zdaje się powielać inne odpowiedzi
OneCricketeer

2

Wiele świetnych odpowiedzi tutaj, ale wśród nich nie znalazłem żadnej o dokerze. Spędziłem trochę czasu, aby dowiedzieć się, że użycie kontenera brokera jest niewłaściwe w tym przypadku (oczywiście !!!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

i powinienem był użyć zookeeper:2181zamiast --zookeeper localhost:2181jak na mój plik tworzenia

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

poprawne byłoby polecenie

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

Mam nadzieję, że pozwoli to komuś zaoszczędzić czas.

Pamiętaj również, że wiadomości nie zostaną natychmiast usunięte i zdarzy się to, gdy segment dziennika zostanie zamknięty.


Możesz wykonać egzekucję na brokerze. Problemem jest localhost:2181... Np. Nie rozumiesz funkcji sieciowych Dockera. Ponadto nie wszystkie pojemniki Zookeeper mają kafka-topics, więc najlepiej nie używać go w ten sposób. Najnowsze instalacje Kafka pozwalają na --bootstrap-serverszmianę tematu zamiast--zookeeper
OneCricketeer

1
Mimo to egzekucja w kontenerze Zookeepera wydaje się nieprawidłowa. you can use --zookeeper zookeeper: 2181` z kontenera Kafka jest moim celem. Lub nawet wyodrębnij wiersz Zookeeper z pliku server.properties
OneCricketeer

@ cricket_007 hej, dziękuję za to, naprawdę poprawiłem odpowiedź, daj mi znać, jeśli coś jest nadal nie tak
Vladimir Semashkin

1

Nie można dodać jako komentarza ze względu na rozmiar: Nie jestem pewien, czy to prawda, oprócz aktualizacji retention.ms i retention.bytes, ale zauważyłem, że zasadą czyszczenia tematów powinno być „usuwanie” (domyślnie), jeśli „kompaktowe”, to będzie trzymaj wiadomości dłużej, tzn. jeśli są „zwarte”, musisz również określić delete.retention.ms .

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

Musiałem również monitorować najwcześniejsze / najnowsze przesunięcia powinny być takie same, aby potwierdzić, że to się pomyślnie wydarzyło, można również sprawdzić du -h / tmp / kafka-logs / test-topic-3-100- *

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762

Innym problemem jest to, trzeba uzyskać bieżący config pierwszy więc pamiętać, aby powrócić po usunięciu powiedzie: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics


1

Innym, raczej ręcznym podejściem do usuwania tematu jest:

u brokerów:

  1. zatrzymać brokera kafka
    sudo service kafka stop
  2. usuń wszystkie pliki dziennika partycji (należy to zrobić na wszystkich brokerach)
    sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*

w dozorcy:

  1. uruchom interfejs wiersza poleceń zookeeper
    sudo /usr/lib/zookeeper/bin/zkCli.sh
  2. użyj zkCli, aby usunąć metadane tematu
    rmr /brokers/topic/<some_topic_name>

ponownie u brokerów:

  1. zrestartuj usługę brokera
    sudo service kafka start

Musisz zatrzymać i usunąć pliki z każdego brokera za pomocą repliki, co oznacza, że ​​możesz mieć w ten sposób przestoje klienta
OneCricketeer

1
masz rację, ta pozwala po prostu zobaczyć, gdzie niektóre rzeczy są przechowywane i zarządzane przez Kafkę. ale to podejście brutalnej siły zdecydowanie nie jest przeznaczone dla systemu działającego w produkcji.
Danny Mor

1
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic

To powinno dać retention.msskonfigurowane. Następnie możesz użyć powyższej komendy alter, aby zmienić na 1 sekundę (a później wrócić do ustawień domyślnych).

Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000

1

Z Java, używając nowego AdminZkClientzamiast przestarzałego AdminUtils:

  public void reset() {
    try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
        5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {

      for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
        deleteTopic(entry.getKey(), zkClient);
      }
    }
  }

  private void deleteTopic(String topic, KafkaZkClient zkClient) {

    // skip Kafka internal topic
    if (topic.startsWith("__")) {
      return;
    }

    System.out.println("Resetting Topic: " + topic);
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    adminZkClient.deleteTopic(topic);

    // deletions are not instantaneous
    boolean success = false;
    int maxMs = 5_000;
    while (maxMs > 0 && !success) {
      try {
        maxMs -= 100;
        adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
        success = true;
      } catch (TopicExistsException ignored) {
      }
    }

    if (!success) {
      Assert.fail("failed to create " + topic);
    }
  }

  private Map<String, List<PartitionInfo>> listTopics() {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
    props.put("group.id", "test-container-consumer-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Map<String, List<PartitionInfo>> topics = consumer.listTopics();
    consumer.close();

    return topics;
  }

Nie potrzebujesz Zookeepera. Użyj AdminClientlubKafkaAdminClient
OneCricketeer
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.