Pobierz listę zadań w kolejce w Seler


Odpowiedzi:


174

EDYTUJ: Zobacz inne odpowiedzi, aby uzyskać listę zadań w kolejce.

Powinieneś zajrzeć tutaj: Przewodnik po selerze - Inspekcja pracowników

Zasadniczo to:

from celery.app.control import Inspect

# Inspect all nodes.
i = Inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

W zależności od tego, czego chcesz


9
Próbowałem, ale to naprawdę powolne (jak 1 sekunda). Używam go synchronicznie w aplikacji tornado do monitorowania postępów, więc musi być szybki.
JulienFr

41
Nie zwróci to listy zadań w kolejce, które nie zostały jeszcze przetworzone.
Ed J

9
Użyj, i.reserved()aby uzyskać listę zadań w kolejce.
Banan

4
Czy ktoś doświadczył, że i.reserved () nie ma dokładnej listy aktywnych zadań? Mam uruchomione zadania, których nie ma na liście. Jestem na django-seler == 3.1.10
Seperman

6
Przy określaniu pracownika musiałem użyć listy jako argument: inspect(['celery@Flatty']). Ogromna poprawa szybkości inspect().
Adversus

42

jeśli używasz rabbitMQ, użyj tego w terminalu:

sudo rabbitmqctl list_queues

wypisze listę kolejek z liczbą oczekujących zadań. na przykład:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

liczba w prawej kolumnie to liczba zadań w kolejce. powyżej, kolejka selera ma 166 oczekujących zadań.


1
Znam się na tym, gdy mam uprawnienia sudo, ale chcę, aby nieuprzywilejowany użytkownik systemu mógł to sprawdzić - jakieś sugestie?
sage

Ponadto możesz to przepuścić, grep -e "^celery\s" | cut -f2aby wyodrębnić, że 166jeśli chcesz przetworzyć tę liczbę później, powiedzmy dla statystyk.
jamesc

21

Jeśli nie używasz zadań z priorytetami, jest to całkiem proste, jeśli używasz Redis. Aby otrzymać zadanie liczy się:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

Ale zadania z priorytetami używają innego klucza w redis , więc pełny obraz jest nieco bardziej skomplikowany. Pełny obraz jest taki, że należy zapytać redis o każdy priorytet zadania. W Pythonie (i z projektu Flower) wygląda to tak:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

Jeśli chcesz otrzymać aktualne zadanie, możesz użyć czegoś takiego:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

Stamtąd będziesz musiał deserializować zwróconą listę. W moim przypadku udało mi się to osiągnąć za pomocą czegoś takiego:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

Ostrzegam tylko, że deserializacja może chwilę potrwać i będziesz musiał dostosować powyższe polecenia, aby działały z różnymi priorytetami.


Po użyciu tego w produkcji dowiedziałem się, że zawodzi, jeśli używasz zadań z priorytetami , ze względu na projekt selera.
mlissner

1
Zaktualizowałem powyższe, aby obsługiwać priorytetowe zadania. Postęp!
mlissner

1
Aby przeliterować, DATABASE_NUMBERdomyślnie używane jest 0i QUEUE_NAMEjest celery, więc redis-cli -n 0 llen celeryzwróci liczbę wiadomości w kolejce.
Vineet Bansal

W przypadku mojego selera nazwa kolejki jest '{{{0}}}{1}{2}'zamiast '{0}{1}{2}'. Poza tym działa to doskonale!
zupo

12

Aby pobrać zadania z zaplecza, użyj tego

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

2
ale „jobs” podaje tylko liczbę zadań w kolejce
bitnik

Zobacz stackoverflow.com/a/57807913/9843399, aby uzyskać pokrewną odpowiedź zawierającą nazwy zadań.
Caleb Syring,

10

Jeśli używasz Celery + Django, najprostszy sposób sprawdzania zadań za pomocą poleceń bezpośrednio z terminala w środowisku wirtualnym lub przy użyciu pełnej ścieżki do selera:

Dokument : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

Również jeśli używasz Celery + RabbitMQ , możesz sprawdzić listę kolejek za pomocą następującego polecenia:

Więcej informacji : https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

4
Jeśli masz zdefiniowany projekt, możesz użyćcelery -A my_proj inspect reserved
sashaboulouds

6

Rozwiązanie typu kopiuj-wklej dla Redis z serializacją json:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

Działa z Django. Po prostu nie zapomnij się zmienić yourproject.celery.


1
Jeśli używasz serializatora marynat, możesz zmienić body =linię na body = pickle.loads(base64.b64decode(j['body'])).
Jim Hunziker

4

Wydaje się, że moduł inspekcji selera jest świadomy zadań tylko z perspektywy pracowników. Jeśli chcesz zobaczyć wiadomości, które są w kolejce (jeszcze nie zostały wyciągnięte przez pracowników), sugeruję użycie pyrabbita , który może współpracować z interfejsem http api rabbitmq w celu pobrania wszelkiego rodzaju informacji z kolejki.

Przykład można znaleźć tutaj: Pobierz długość kolejki za pomocą Celery (RabbitMQ, Django)


3

Myślę, że jedynym sposobem na uzyskanie zadań, które oczekują, jest zachowanie listy rozpoczętych zadań i pozwolenie zadaniu na usunięcie się z listy po uruchomieniu.

Z rabbitmqctl i list_queues możesz zobaczyć, ile zadań czeka, ale nie same zadania: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

Jeśli to, co chcesz, zawiera przetwarzane zadanie, ale jeszcze nie zostało ukończone, możesz zachować listę swoich zadań i sprawdzić ich stany:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

Możesz też pozwolić Selerowi przechowywać wyniki w CELERY_RESULT_BACKEND i sprawdzić, których zadań tam nie ma.


3

To zadziałało w mojej aplikacji:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

active_jobs będzie listą ciągów odpowiadających zadaniom w kolejce.

Nie zapomnij wymienić CELERY_APP_INSTANCE na własną.

Dziękuję @ashish za wskazanie mi właściwego kierunku i jego odpowiedź tutaj: https://stackoverflow.com/a/19465670/9843399


w moim przypadku jobszawsze wynosi zero ... jakiś pomysł?
daveoncode

@daveoncode Nie sądzę, że to wystarczająca ilość informacji, abym mógł udzielić pomocy. Możesz otworzyć własne pytanie. Nie sądzę, aby był to duplikat tego, jeśli określisz, że chcesz pobrać informacje w Pythonie. Wróciłbym do stackoverflow.com/a/19465670/9843399 , na którym oparłem moją odpowiedź, i upewniłem się, że zadziała w pierwszej kolejności.
Caleb Syring

@CalebSyring To jest pierwsze podejście, które naprawdę pokazuje mi zadania w kolejce. Bardzo dobrze. Jedynym problemem dla mnie jest to, że dołączanie listy nie wydaje się działać. Jakieś pomysły, jak mogę sprawić, by funkcja wywołania zwrotnego zapisywała się na liście?
Varlor,

@Varlor Przykro mi, ktoś dokonał niewłaściwej edycji mojej odpowiedzi. Możesz przejrzeć historię zmian w poszukiwaniu oryginalnej odpowiedzi, która najprawdopodobniej będzie dla Ciebie odpowiednia. Pracuję nad rozwiązaniem tego problemu. (EDYCJA: Właśnie wszedłem i odrzuciłem edycję, która miała oczywisty błąd Pythona. Daj mi znać, czy to rozwiązało problem, czy nie.)
Caleb Syring

@CalebSyring Użyłem teraz twojego kodu w klasie, mając listę jako atrybut klasy!
Varlor

2

O ile wiem, Celery nie udostępnia API do sprawdzania zadań, które czekają w kolejce. To jest specyficzne dla brokera. Jeśli na przykład używasz Redis jako brokera, zbadanie zadań oczekujących w celery(domyślnej) kolejce jest tak proste, jak:

  1. połączyć się z bazą danych brokera
  2. lista elementów na celeryliście (na przykład polecenie LRANGE)

Należy pamiętać, że są to zadania CZEKAJĄCE na wybranie przez dostępnych pracowników. W Twoim klastrze mogą być uruchomione pewne zadania - nie będzie ich na tej liście, ponieważ zostały już wybrane.


1

Doszedłem do wniosku, że najlepszym sposobem uzyskania liczby zadań w kolejce jest użycie, rabbitmqctlco zostało tu kilkakrotnie zasugerowane. Aby umożliwić dowolnemu wybranemu użytkownikowi uruchomienie polecenia z sudo, postępowałem zgodnie z instrukcjami tutaj (pominąłem edycję części profilu, ponieważ nie mam nic przeciwko wpisywaniu sudo przed poleceniem).

Złapałem też Jamesca grepi cutsnippet i zawinąłem je w wywołania podprocesu.

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

1
from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False

0

Jeśli kontrolujesz kod zadań, możesz obejść problem, pozwalając zadaniu wywołać trywialną ponowną próbę przy pierwszym wykonaniu, a następnie sprawdzając inspect().reserved(). Ponowna próba rejestruje zadanie z zapleczem wyników i seler to widzi. Zadanie musi zaakceptować selflub contextjako pierwszy parametr, abyśmy mogli uzyskać dostęp do liczby ponownych prób.

@task(bind=True)
def mytask(self):
    if self.request.retries == 0:
        raise self.retry(exc=MyTrivialError(), countdown=1)
    ...

To rozwiązanie jest agnostyczne dla brokera, tj. nie musisz się martwić, czy do przechowywania zadań używasz RabbitMQ czy Redis.

EDYCJA: po przetestowaniu stwierdziłem, że jest to tylko częściowe rozwiązanie. Rozmiar zarezerwowanego jest ograniczony do ustawienia pobierania wstępnego dla pracownika.


0

Z subprocess.run:

import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
                                        stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))

Należy uważać, aby zmienić my_projzyour_proj

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.