Testy jednostkowe z django-seler?


82

Próbuję wymyślić metodologię testowania dla naszego projektu django-seler . Przeczytałem notatki w dokumentacji , ale nie dały mi one dobrego pojęcia, co właściwie mam robić. Nie martwię się testowaniem zadań w rzeczywistych demonach, tylko funkcjonalność mojego kodu. Głównie się zastanawiam:

  1. Jak możemy ominąć task.delay()podczas testu (próbowałem ustawić, CELERY_ALWAYS_EAGER = Trueale to bez różnicy)?
  2. W jaki sposób używamy zalecanych ustawień testowych (jeśli to najlepszy sposób) bez faktycznej zmiany naszego settings.py?
  3. Can we still use manage.py test or do we have to use a custom runner?

Overall any hints or tips for testing with celery would be very helpful.


1
co masz na myśli, CELERY_ALWAYS_EAGERnie robi różnicy
asksol

Nadal otrzymuję błędy dotyczące braku możliwości skontaktowania się z rabbitmq.
Jason Webb

Czy masz śledzenie? Myślę, że coś innego niż .delaypróba nawiązania połączenia.
asksol

11
BROKER_BACKEND=memoryW takim przypadku ustawienie może pomóc.
asksol

Zapytaj, czy miałeś rację. BROKER_BACKEND=memorynaprawione. Jeśli podasz to jako odpowiedź, zaznaczę to jako poprawne.
Jason Webb

Odpowiedzi:



72

Lubię używać dekoratora override_settings w testach, które wymagają wyników selera.

from django.test import TestCase
from django.test.utils import override_settings
from myapp.tasks import mytask

class AddTestCase(TestCase):

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        result = mytask.delay()
        self.assertTrue(result.successful())

Jeśli chcesz zastosować to do wszystkich testów, możesz użyć narzędzia do uruchamiania testów selera, jak opisano na http://docs.celeryproject.org/en/2.5/django/unit-testing.html, który zasadniczo określa te same ustawienia, z wyjątkiem ( BROKER_BACKEND = 'memory').

W ustawieniach:

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

Spójrz na źródło CeleryTestSuiteRunner i jest całkiem jasne, co się dzieje.


1
To nie zadziałało z selerem 4, nawet przy przemianach pól stąd
shadi

Działa na Seler 3.1. Po prostu mam moje przypadki testowe Selera dziedziczone z klasy nadrzędnej z tym dekoratorem. W ten sposób jest potrzebny tylko w jednym miejscu i nie ma potrzeby ciągnięcia djcelery.
kontextify

1
Działa to świetnie na Seler 4.4. i Django 2.2. Najlepsze podejście do uruchamiania testów jednostkowych, z jakim się do tej pory spotkałem.
Erik Kalkoken

18

Oto fragment mojej testowej klasy bazowej, która odcina apply_asyncmetodę i zapisuje wywołania do niej (co obejmuje Task.delay.) Jest trochę obrzydliwy, ale udało mi się dopasować do moich potrzeb w ciągu ostatnich kilku miesięcy, kiedy z niej korzystałem.

from django.test import TestCase
from celery.task.base import Task
# For recent versions, Task has been moved to celery.task.app:
# from celery.app.task import Task
# See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

class CeleryTestCaseBase(TestCase):

    def setUp(self):
        super(CeleryTestCaseBase, self).setUp()
        self.applied_tasks = []

        self.task_apply_async_orig = Task.apply_async

        @classmethod
        def new_apply_async(task_class, args=None, kwargs=None, **options):
            self.handle_apply_async(task_class, args, kwargs, **options)

        # monkey patch the regular apply_sync with our method
        Task.apply_async = new_apply_async

    def tearDown(self):
        super(CeleryTestCaseBase, self).tearDown()

        # Reset the monkey patch to the original method
        Task.apply_async = self.task_apply_async_orig

    def handle_apply_async(self, task_class, args=None, kwargs=None, **options):
        self.applied_tasks.append((task_class, tuple(args), kwargs))

    def assert_task_sent(self, task_class, *args, **kwargs):
        was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2]
                       for task in self.applied_tasks)
        self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args))

    def assert_task_not_sent(self, task_class):
        was_sent = any(task_class == task[0] for task in self.applied_tasks)
        self.assertFalse(was_sent, 'Task was not expected to be called, but was.  Applied tasks: %s' %                 self.applied_tasks)

Oto przykład „z góry głowy”, jak można go użyć w przypadkach testowych:

mymodule.py

from my_tasks import SomeTask

def run_some_task(should_run):
    if should_run:
        SomeTask.delay(1, some_kwarg=2)

test_mymodule.py

class RunSomeTaskTest(CeleryTestCaseBase):
    def test_should_run(self):
        run_some_task(should_run=True)
        self.assert_task_sent(SomeTask, 1, some_kwarg=2)

    def test_should_not_run(self):
        run_some_task(should_run=False)
        self.assert_task_not_sent(SomeTask)

4

ponieważ nadal widzę ten komunikat w wynikach wyszukiwania, zastąpienie ustawień za pomocą

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

pracował dla mnie zgodnie z Celery Docs


1

Dla wszystkich, którzy przybędą tu w 2019 roku: zapoznaj się z tym artykułem obejmującym różne strategie, w tym synchroniczne wywoływanie zadań.


1

To właśnie zrobiłem

Wewnątrz myapp.tasks.py mam:

from celery import shared_task

@shared_task()
def add(a, b):
    return a + b

Wewnątrz myapp.test_tasks.py mam:

from django.test import TestCase, override_settings
from myapp.tasks import add


class TasksTestCase(TestCase):

    def setUp(self):
        ...

    @override_settings(CELERY_TASK_ALWAYS_EAGER=True,CELERY_TASK_EAGER_PROPOGATES=True)
    def test_create_sections(self):
        result= add.delay(1,2)
        assert result.successful() == True
        assert result.get() == 3
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.