Java, Jak uzyskać liczbę wiadomości w temacie w apache kafka


93

Używam apache kafka do przesyłania wiadomości. Wdrożyłem producenta i konsumenta w Javie. Jak możemy uzyskać liczbę wiadomości w temacie?

Odpowiedzi:


34

Jedynym sposobem, który przychodzi na myśl z punktu widzenia konsumenta, jest faktyczne skonsumowanie wiadomości i ich policzenie.

Broker Kafka ujawnia liczniki JMX dla liczby wiadomości odebranych od momentu uruchomienia, ale nie można wiedzieć, ile z nich zostało już wyczyszczonych.

W większości scenariuszy wiadomości w Kafce są najlepiej postrzegane jako nieskończony strumień, a uzyskanie dyskretnej wartości tego, ile jest obecnie przechowywanych na dysku, nie ma znaczenia. Co więcej, sytuacja staje się bardziej skomplikowana, gdy mamy do czynienia z grupą brokerów, z których każdy ma podzbiór wiadomości w temacie.


Zobacz moją odpowiedź stackoverflow.com/a/47313863/2017567 . Klient Java Kafka umożliwia uzyskanie tych informacji.
Christophe Quintard

96

To nie jest Java, ale może być przydatne

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 
  | awk -F  ":" '{sum += $3} END {print sum}'

12
Czy nie powinna to być różnica najwcześniejszego i ostatniego przesunięcia na sumę podziału? bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609 A potem różnica zwraca faktyczne wiadomości oczekujące w temacie? Mam rację?
kisna

1
Tak to prawda. Musisz obliczyć różnicę, jeśli najwcześniejsze przesunięcia nie są równe zeru.
ssemiczew

Tak myślałem :).
kisna

1
Czy istnieje JAKIEKOLWIEK sposób na użycie tego jako API, a więc wewnątrz kodu (JAVA, Scala lub Python)?
salvob

Oto połączenie mojego kodu i kodu z Kafki. Może się przydać. Użyłem go do przesyłania strumieniowego Spark - integracja Kafka KafkaClient gist.github.com/ssemichev/c2d94dce7ad65339c9637e1b461f86cf KafkaCluster gist.github.com/ssemichev/fa3605c7b10cb6c7b9c8ab54ffbc5865
ssemichev

17

Właściwie używam tego do testowania mojego POC. Element, którego chcesz użyć ConsumerOffsetChecker. Możesz go uruchomić za pomocą skryptu bash, jak poniżej.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

A poniżej wynik: wprowadź opis obrazu tutaj Jak widać na czerwonym polu, 999 to numer wiadomości aktualnie w temacie.

Aktualizacja: ConsumerOffsetChecker jest przestarzały od 0.10.0, możesz zacząć używać ConsumerGroupCommand.


1
Należy pamiętać, że ConsumerOffsetChecker jest przestarzały i zostanie usunięty w wersjach następujących po 0.9.0. Zamiast tego użyj ConsumerGroupCommand. (kafka.tools.ConsumerOffsetChecker $)
Szymon Sadło

1
Tak, to właśnie powiedziałem.
Rudy

Twoje ostatnie zdanie nie jest poprawne. Powyższe polecenie nadal działa w wersji 0.10.0.1, a ostrzeżenie jest takie samo jak mój poprzedni komentarz.
Szymon Sadło

14

Czasami interesuje nas znajomość liczby komunikatów w każdej partycji, na przykład podczas testowania niestandardowego partycji. Kolejne kroki zostały przetestowane pod kątem współpracy z Kafką 0.10.2.1-2 z Confluent 3.2. Biorąc pod uwagę temat Kafka kti następujący wiersz poleceń:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

To drukuje przykładowe dane wyjściowe pokazujące liczbę wiadomości w trzech partycjach:

kt:2:6138
kt:1:6123
kt:0:6137

Liczba wierszy może być mniej więcej w zależności od liczby partycji w temacie.


4
Jeśli włączono kompaktowanie dziennika, sumowanie przesunięć partycji może nie dać dokładnej liczby komunikatów w temacie.

14

Ponieważ ConsumerOffsetCheckernie jest już obsługiwane, możesz użyć tego polecenia, aby sprawdzić wszystkie wiadomości w temacie:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --group my-group \
    --bootstrap-server localhost:9092 \
    --describe

Gdzie LAGjest liczba wiadomości w partycji tematu:

wprowadź opis obrazu tutaj

Możesz także spróbować użyć kafkacat . Jest to projekt open source, który może pomóc w czytaniu wiadomości z tematu i partycji oraz drukowaniu ich na standardowe wyjście. Oto przykład, który odczytuje 10 ostatnich wiadomości z sample-kafka-topictematu, a następnie kończy:

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e

11

Użyj https://prestodb.io/docs/current/connector/kafka-tutorial.html

Super silnik SQL dostarczony przez Facebooka, który łączy się z kilkoma źródłami danych (Cassandra, Kafka, JMX, Redis ...).

PrestoDB działa jako serwer z opcjonalnymi pracownikami (istnieje tryb autonomiczny bez dodatkowych pracowników), wtedy do tworzenia zapytań używasz małego wykonywalnego pliku JAR (zwanego presto CLI).

Po prawidłowym skonfigurowaniu serwera Presto możesz używać tradycyjnego SQL:

SELECT count(*) FROM TOPIC_NAME;

to narzędzie jest fajne, ale jeśli nie zadziała, jeśli Twój temat ma więcej niż 2 kropki.
armandfp

7

Polecenie Apache Kafka, aby uzyskać nieobsłużone komunikaty na wszystkich partycjach tematu:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

Wydruki:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

Kolumna 6 zawiera nieobsłużone wiadomości. Dodaj je w ten sposób:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

awk czyta wiersze, pomija wiersz nagłówka i dodaje szóstą kolumnę, a na końcu wypisuje sumę.

Wydruki

5

5

Aby uzyskać wszystkie wiadomości zapisane dla tematu, możesz wyszukać konsumenta na początku i na końcu strumienia dla każdej partycji i zsumować wyniki

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());

1
btw, jeśli masz włączone kompaktowanie, w strumieniu mogą występować przerwy, więc rzeczywista liczba wiadomości może być niższa niż całkowita obliczona tutaj. Aby uzyskać dokładną sumę, będziesz musiał odtworzyć wiadomości i policzyć je.
AutomatedMike

5

Uruchom następujące (zakładając, że kafka-console-consumer.shjest na ścieżce):

kafka-console-consumer.sh  --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true  \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"

Uwaga: usunąłem --new-consumertę opcję, ponieważ ta opcja nie jest już dostępna (lub najwyraźniej konieczna)
StephenBoesch

3

Korzystając z klienta Java Kafki 2.11-1.0.0, możesz wykonać następujące czynności:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

Wynik wygląda mniej więcej tak:

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13

Wolę odpowiedzieć porównując do @AutomatedMike odpowiedź ponieważ odpowiedź nie bałagan z seekToEnd(..)i seekToBeginning(..)metody, które zmieniają stan consumer.
adasław

2

W najnowszych wersjach programu Kafka Manager znajduje się kolumna zatytułowana Sumowane ostatnie przesunięcia .

wprowadź opis obrazu tutaj


2

Miałem to samo pytanie i tak to robię od Konsumenta Kafki w Kotlinie:

val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
    .map {
        it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
    }.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
    .first()

Bardzo zgrubny kod, ponieważ właśnie to uruchomiłem, ale zasadniczo chcesz odjąć przesunięcie początku tematu od przesunięcia końcowego i będzie to bieżąca liczba wiadomości w temacie.

Nie możesz polegać tylko na przesunięciu końcowym ze względu na inne konfiguracje (zasady czyszczenia, retention-ms, itp.), Które mogą spowodować usunięcie starych wiadomości z tematu. Przesunięcia „przesuwają się” tylko do przodu, więc jest to początkowe przesunięcie, które przesunie się do przodu bliżej końcowego przesunięcia (lub ostatecznie do tej samej wartości, jeśli temat nie zawiera w tej chwili wiadomości).

Zasadniczo przesunięcie końcowe reprezentuje całkowitą liczbę wiadomości, które przeszły przez ten temat, a różnica między nimi reprezentuje liczbę wiadomości, które temat zawiera obecnie.


0

Fragmenty dokumentów Kafki

Wycofania w wersji 0.9.0.0

Element kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) został wycofany. Idąc dalej, użyj tej funkcji kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand).

Używam brokera Kafka z włączonym SSL zarówno dla serwera, jak i klienta. Poniżej polecenie, którego używam

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

gdzie / tmp / ssl_config jest jak poniżej

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password

0

Jeśli masz dostęp do interfejsu JMX serwera, przesunięcia początku i końca są obecne w:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(musisz wymienić TOPICNAME& PARTITIONNUMBER). Pamiętaj, że musisz sprawdzić dla każdej z replik danej partycji, lub musisz dowiedzieć się, który z brokerów jest liderem dla danej partycji (i może się to zmieniać w czasie).

Alternatywnie możesz użyć metod Kafka ConsumerbeginningOffsets i endOffsets.


-1

Nie próbowałem ten sam, ale wydaje się mieć sens.

Możesz także użyć kafka.tools.ConsumerOffsetChecker( źródło ).



-1

Możesz użyć kafkatool . Sprawdź ten link -> http://www.kafkatool.com/download.html

Kafka Tool to aplikacja GUI do zarządzania i używania klastrów Apache Kafka. Zapewnia intuicyjny interfejs użytkownika, który pozwala szybko przeglądać obiekty w klastrze Kafka, a także wiadomości przechowywane w tematach klastra.wprowadź opis obrazu tutaj

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.