Jak mogę pobrać listę zadań w kolejce, które nie zostały jeszcze przetworzone?
Jak mogę pobrać listę zadań w kolejce, które nie zostały jeszcze przetworzone?
Odpowiedzi:
EDYTUJ: Zobacz inne odpowiedzi, aby uzyskać listę zadań w kolejce.
Powinieneś zajrzeć tutaj: Przewodnik po selerze - Inspekcja pracowników
Zasadniczo to:
from celery.app.control import Inspect
# Inspect all nodes.
i = Inspect()
# Show the items that have an ETA or are scheduled for later processing
i.scheduled()
# Show tasks that are currently active.
i.active()
# Show tasks that have been claimed by workers
i.reserved()
W zależności od tego, czego chcesz
i.reserved()
aby uzyskać listę zadań w kolejce.
inspect(['celery@Flatty'])
. Ogromna poprawa szybkości inspect()
.
jeśli używasz rabbitMQ, użyj tego w terminalu:
sudo rabbitmqctl list_queues
wypisze listę kolejek z liczbą oczekujących zadań. na przykład:
Listing queues ...
0b27d8c59fba4974893ec22d478a7093 0
0e0a2da9828a48bc86fe993b210d984f 0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7 0
15c036ad25884b82839495fb29bd6395 1
celerey_mail_worker@torob2.celery.pidbox 0
celery 166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0
liczba w prawej kolumnie to liczba zadań w kolejce. powyżej, kolejka selera ma 166 oczekujących zadań.
grep -e "^celery\s" | cut -f2
aby wyodrębnić, że 166
jeśli chcesz przetworzyć tę liczbę później, powiedzmy dla statystyk.
Jeśli nie używasz zadań z priorytetami, jest to całkiem proste, jeśli używasz Redis. Aby otrzymać zadanie liczy się:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Ale zadania z priorytetami używają innego klucza w redis , więc pełny obraz jest nieco bardziej skomplikowany. Pełny obraz jest taki, że należy zapytać redis o każdy priorytet zadania. W Pythonie (i z projektu Flower) wygląda to tak:
PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]
def make_queue_name_for_pri(queue, pri):
"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""
if pri not in DEFAULT_PRIORITY_STEPS:
raise ValueError('Priority not in priority steps')
return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
(queue, '', '')))
def get_queue_length(queue_name='celery'):
"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
return sum([r.llen(x) for x in priority_names])
Jeśli chcesz otrzymać aktualne zadanie, możesz użyć czegoś takiego:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1
Stamtąd będziesz musiał deserializować zwróconą listę. W moim przypadku udało mi się to osiągnąć za pomocą czegoś takiego:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Ostrzegam tylko, że deserializacja może chwilę potrwać i będziesz musiał dostosować powyższe polecenia, aby działały z różnymi priorytetami.
DATABASE_NUMBER
domyślnie używane jest 0
i QUEUE_NAME
jest celery
, więc redis-cli -n 0 llen celery
zwróci liczbę wiadomości w kolejce.
'{{{0}}}{1}{2}'
zamiast '{0}{1}{2}'
. Poza tym działa to doskonale!
Aby pobrać zadania z zaplecza, użyj tego
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
Jeśli używasz Celery + Django, najprostszy sposób sprawdzania zadań za pomocą poleceń bezpośrednio z terminala w środowisku wirtualnym lub przy użyciu pełnej ścieżki do selera:
Dokument : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled
Również jeśli używasz Celery + RabbitMQ , możesz sprawdzić listę kolejek za pomocą następującego polecenia:
Więcej informacji : https://linux.die.net/man/1/rabbitmqctl
$ sudo rabbitmqctl list_queues
celery -A my_proj inspect reserved
Rozwiązanie typu kopiuj-wklej dla Redis z serializacją json:
def get_celery_queue_items(queue_name):
import base64
import json
# Get a configured instance of a celery app:
from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
decoded_tasks = []
for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)
return decoded_tasks
Działa z Django. Po prostu nie zapomnij się zmienić yourproject.celery
.
body =
linię na body = pickle.loads(base64.b64decode(j['body']))
.
Wydaje się, że moduł inspekcji selera jest świadomy zadań tylko z perspektywy pracowników. Jeśli chcesz zobaczyć wiadomości, które są w kolejce (jeszcze nie zostały wyciągnięte przez pracowników), sugeruję użycie pyrabbita , który może współpracować z interfejsem http api rabbitmq w celu pobrania wszelkiego rodzaju informacji z kolejki.
Przykład można znaleźć tutaj: Pobierz długość kolejki za pomocą Celery (RabbitMQ, Django)
Myślę, że jedynym sposobem na uzyskanie zadań, które oczekują, jest zachowanie listy rozpoczętych zadań i pozwolenie zadaniu na usunięcie się z listy po uruchomieniu.
Z rabbitmqctl i list_queues możesz zobaczyć, ile zadań czeka, ale nie same zadania: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Jeśli to, co chcesz, zawiera przetwarzane zadanie, ale jeszcze nie zostało ukończone, możesz zachować listę swoich zadań i sprawdzić ich stany:
from tasks import add
result = add.delay(4, 4)
result.ready() # True if finished
Możesz też pozwolić Selerowi przechowywać wyniki w CELERY_RESULT_BACKEND i sprawdzić, których zadań tam nie ma.
To zadziałało w mojej aplikacji:
def get_celery_queue_active_jobs(queue_name):
connection = <CELERY_APP_INSTANCE>.connection()
try:
channel = connection.channel()
name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
active_jobs = []
def dump_message(message):
active_jobs.append(message.properties['application_headers']['task'])
channel.basic_consume(queue=queue_name, callback=dump_message)
for job in range(jobs):
connection.drain_events()
return active_jobs
finally:
connection.close()
active_jobs
będzie listą ciągów odpowiadających zadaniom w kolejce.
Nie zapomnij wymienić CELERY_APP_INSTANCE na własną.
Dziękuję @ashish za wskazanie mi właściwego kierunku i jego odpowiedź tutaj: https://stackoverflow.com/a/19465670/9843399
jobs
zawsze wynosi zero ... jakiś pomysł?
O ile wiem, Celery nie udostępnia API do sprawdzania zadań, które czekają w kolejce. To jest specyficzne dla brokera. Jeśli na przykład używasz Redis jako brokera, zbadanie zadań oczekujących w celery
(domyślnej) kolejce jest tak proste, jak:
celery
liście (na przykład polecenie LRANGE)Należy pamiętać, że są to zadania CZEKAJĄCE na wybranie przez dostępnych pracowników. W Twoim klastrze mogą być uruchomione pewne zadania - nie będzie ich na tej liście, ponieważ zostały już wybrane.
Doszedłem do wniosku, że najlepszym sposobem uzyskania liczby zadań w kolejce jest użycie, rabbitmqctl
co zostało tu kilkakrotnie zasugerowane. Aby umożliwić dowolnemu wybranemu użytkownikowi uruchomienie polecenia z sudo
, postępowałem zgodnie z instrukcjami tutaj (pominąłem edycję części profilu, ponieważ nie mam nic przeciwko wpisywaniu sudo przed poleceniem).
Złapałem też Jamesca grep
i cut
snippet i zawinąłem je w wywołania podprocesu.
from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))
Jeśli kontrolujesz kod zadań, możesz obejść problem, pozwalając zadaniu wywołać trywialną ponowną próbę przy pierwszym wykonaniu, a następnie sprawdzając inspect().reserved()
. Ponowna próba rejestruje zadanie z zapleczem wyników i seler to widzi. Zadanie musi zaakceptować self
lub context
jako pierwszy parametr, abyśmy mogli uzyskać dostęp do liczby ponownych prób.
@task(bind=True)
def mytask(self):
if self.request.retries == 0:
raise self.retry(exc=MyTrivialError(), countdown=1)
...
To rozwiązanie jest agnostyczne dla brokera, tj. nie musisz się martwić, czy do przechowywania zadań używasz RabbitMQ czy Redis.
EDYCJA: po przetestowaniu stwierdziłem, że jest to tylko częściowe rozwiązanie. Rozmiar zarezerwowanego jest ograniczony do ustawienia pobierania wstępnego dla pracownika.
Z subprocess.run
:
import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))
Należy uważać, aby zmienić my_proj
zyour_proj