Wieloprocesorowość Django i połączenia z bazami danych


85

Tło:

Pracuję nad projektem, który używa Django z bazą danych Postgres. Używamy również mod_wsgi na wypadek, gdyby miało to znaczenie, ponieważ niektóre z moich wyszukiwań w Internecie wspominały o tym. Po przesłaniu formularza internetowego widok Django uruchamia zadanie, które zajmie znaczną ilość czasu (więcej niż użytkownik chciałby czekać), więc rozpoczynamy zadanie poprzez wywołanie systemowe w tle. Zadanie, które jest teraz uruchomione, musi mieć możliwość odczytu i zapisu w bazie danych. Ponieważ to zadanie trwa tak długo, używamy przetwarzania wieloprocesowego do równoległego uruchamiania jego części.

Problem:

Skrypt najwyższego poziomu ma połączenie z bazą danych i kiedy uruchamia się z procesów potomnych, wydaje się, że połączenie rodzica jest dostępne dla dzieci. Następnie istnieje wyjątek dotyczący tego, jak należy wywołać ustawienie POZIOM IZOLACJI TRANSAKCJI przed zapytaniem. Badania wykazały, że jest to spowodowane próbą użycia tego samego połączenia z bazą danych w wielu procesach. Jeden wątek, który znalazłem, sugerował wywołanie connection.close () na początku procesów potomnych, aby Django automatycznie utworzyło nowe połączenie, gdy będzie to potrzebne, a zatem każdy proces potomny będzie miał unikalne połączenie - tj. Nie będzie współdzielone. To nie zadziałało dla mnie, ponieważ wywołanie connection.close () w procesie potomnym spowodowało, że proces nadrzędny skarżył się, że połączenie zostało utracone.

Inne ustalenia:

Niektóre rzeczy, które przeczytałem, zdawały się wskazywać, że tak naprawdę nie możesz tego zrobić, a przetwarzanie wieloprocesowe, mod_wsgi i Django nie współpracują dobrze. Wydaje mi się, że trudno w to uwierzyć.

Niektórzy sugerowali użycie selera, co może być rozwiązaniem długoterminowym, ale w tej chwili nie mogę zainstalować selera w oczekiwaniu na procesy zatwierdzania, więc nie jest to teraz opcja.

Znalazłem kilka odniesień w SO i innych miejscach na temat trwałych połączeń z bazą danych, co moim zdaniem stanowi inny problem.

Znalazłem również odniesienia do psycopg2.pool i pgpool oraz coś o bouncer. Wprawdzie nie rozumiałem większości z tego, co czytałem na nich, ale z pewnością nie wyskoczyło mi to jako to, czego szukałem.

Bieżące „Obejście”:

Na razie wróciłem do uruchamiania rzeczy po kolei i to działa, ale jest wolniejsze niż bym chciał.

Jakieś sugestie, w jaki sposób mogę równolegle korzystać z przetwarzania wieloprocesowego? Wygląda na to, że gdybym mógł mieć rodzica i dwoje dzieci, wszyscy mieliby niezależne połączenia z bazą danych, wszystko byłoby w porządku, ale nie mogę uzyskać takiego zachowania.

Dzięki i przepraszam za długość!

Odpowiedzi:


71

Przetwarzanie wieloprocesowe kopiuje obiekty połączeń między procesami, ponieważ forsuje procesy, a zatem kopiuje wszystkie deskryptory plików procesu nadrzędnego. To powiedziawszy, połączenie z serwerem SQL to tylko plik, możesz go zobaczyć w linuksie w / proc // fd / .... każdy otwarty plik będzie współdzielony między rozwidlonymi procesami. Więcej o rozwidleniu znajdziesz tutaj .

Moje rozwiązanie polegało po prostu na zamknięciu połączenia db tuż przed uruchomieniem procesów, każdy proces odtwarza połączenie, gdy będzie go potrzebował (testowane w django 1.4):

from django import db
db.connections.close_all()
def db_worker():      
    some_paralell_code()
Process(target = db_worker,args = ())

Pgbouncer / pgpool nie jest powiązany z wątkami w rozumieniu wieloprocesowości. Jest to raczej rozwiązanie polegające na nie zamykaniu połączenia przy każdym żądaniu = przyspieszenie łączenia się z postgresami pod dużym obciążeniem.

Aktualizacja:

Aby całkowicie usunąć problemy z połączeniem z bazą danych, po prostu przenieś całą logikę związaną z bazą danych do db_worker - chciałem przekazać QueryDict jako argument ... Lepszym pomysłem jest po prostu przekazanie listy identyfikatorów ... Zobacz QueryDict i values_list ('id', flat = Prawda) i nie zapomnij włączyć jej do listy! list (QueryDict) przed przekazaniem do db_worker. Dzięki temu nie kopiujemy połączenia z bazą modeli.

def db_worker(models_ids):        
    obj = PartModelWorkerClass(model_ids) # here You do Model.objects.filter(id__in = model_ids)
    obj.run()


model_ids = Model.objects.all().values_list('id', flat=True)
model_ids = list(model_ids) # cast to list
process_count = 5
delta = (len(model_ids) / process_count) + 1

# do all the db stuff here ...

# here you can close db connection
from django import db
db.connections.close_all()

for it in range(0:process_count):
    Process(target = db_worker,args = (model_ids[it*delta:(it+1)*delta]))   

czy mógłbyś wyjaśnić to trochę o przekazywaniu identyfikatorów z zestawu zapytań do pytania, na które odpowiedziano samodzielnie?
Jharwood

1
przetwarzanie wieloprocesowe kopiuje obiekty połączeń między procesami, ponieważ forsuje procesy, a zatem kopiuje wszystkie deskryptory plików procesu nadrzędnego. To powiedziawszy, połączenie z serwerem mysql to tylko plik, możesz go zobaczyć w linuksie w / proc / <PID> / fd / .... każdy otwarty plik będzie współdzielony między rozwidlonymi procesami AFAIK. stackoverflow.com/questions/4277289/…
vlad-ardelean

1
Czy dotyczy to również wątków? Na przykład. zamknij db conn w głównym wątku, a następnie uzyskaj dostęp do bazy danych w każdym wątku, czy każdy wątek uzyska własne połączenie?
James Lin

1
Należy użyć, django.db.connections.close_all()aby zamknąć wszystkie połączenia jednym połączeniem.
Denis Malinovsky

1
Hm ... Tutaj jest dość ciekawa rozmowa między ludźmi z django: code.djangoproject.com/ticket/20562 może rzuci trochę światła na ten temat? Zasadniczo połączenia „nie są forkable”… Każdy proces powinien mieć własne połączenie.
lechup

18

W przypadku korzystania z wielu baz danych należy zamknąć wszystkie połączenia.

from django import db
for connection_name in db.connections.databases:
    db.connections[connection_name].close()

EDYTOWAĆ

Użyj tego samego, co wspomniany @lechup, aby zamknąć wszystkie połączenia (nie jestem pewien, od której wersji django ta metoda została dodana):

from django import db
db.connections.close_all()

9
jest to po prostu wielokrotne wywoływanie db.close_connection
ibz

2
Nie rozumiem, jak to może działać bez używania aliasu lub informacji w dowolnym miejscu.
RemcoGerlich

To ... nie może działać. @Mounir, należy zmodyfikować go do używania aliaslub infow forciele pętli, jeśli dblub close_connection()wsporniki, które.
0atman

5

W przypadku Pythona 3 i Django 1.9 to zadziałało:

import multiprocessing
import django
django.setup() # Must call setup

def db_worker():
    for name, info in django.db.connections.databases.items(): # Close the DB connections
        django.db.connection.close()
    # Execute parallel code here

if __name__ == '__main__':
    multiprocessing.Process(target=db_worker)

Zauważ, że bez django.setup () nie mógłbym tego uruchomić. Domyślam się, że do przetwarzania wieloprocesowego trzeba ponownie zainicjować.


Dzięki! To zadziałało dla mnie i prawdopodobnie powinno być teraz akceptowaną odpowiedzią dla nowszych wersji django.
krischan

Sposób w django polega na utworzeniu polecenia zarządzającego, a nie na tworzeniu samodzielnego skryptu opakowującego. Jeśli nie używasz polecenia zarządzania, musisz użyć setupdjango.
lechup

2
Twoja pętla for tak naprawdę nic nie robi db.connections.databases.items()- po prostu kilkakrotnie zamyka połączenie. db.connections.close_all()działa dobrze, o ile nazywa się to funkcją roboczą.
tao_oat

2

Miałem problemy z „zamkniętym połączeniem” podczas sekwencyjnego uruchamiania przypadków testowych Django . Oprócz testów istnieje również inny proces celowo modyfikujący bazę danych podczas wykonywania testów. Ten proces jest uruchamiany w każdym przypadku testowym setUp ().

Prostym rozwiązaniem było dziedziczenie moich klas testowych z TransactionTestCasezamiast TestCase. Daje to pewność, że baza danych została faktycznie zapisana, a drugi proces ma aktualny widok danych.


1

(niezbyt dobre rozwiązanie, ale możliwe obejście)

jeśli nie możesz użyć selera, może mógłbyś zaimplementować własny system kolejkowania, po prostu dodając zadania do jakiejś tabeli zadań i posiadając zwykły cron, który je wybiera i przetwarza? (za pomocą polecenia zarządzania)


prawdopodobnie - miał nadzieję uniknąć tego poziomu złożoności, ale jeśli to jedyne rozwiązanie, to być może będę musiał pójść tą drogą - dzięki za sugestię. Czy seler jest najlepszą odpowiedzią? jeśli tak, być może będę mógł naciskać, aby go zdobyć, ale zajmie to trochę czasu. Seler kojarzy mi się z przetwarzaniem rozproszonym w przeciwieństwie do przetwarzania równoległego na jednej maszynie, ale może to tylko mój brak doświadczenia z nim ..
daroo

2
seler jest dobrym rozwiązaniem dla każdego przetwarzania wymagane poza cyklem żądanie-odpowiedź
drugi

1

Hej, napotkałem ten problem i udało mi się go rozwiązać, wykonując następujące czynności (wdrażamy system ograniczonych zadań)

task.py

from django.db import connection

def as_task(fn):
    """  this is a decorator that handles task duties, like setting up loggers, reporting on status...etc """ 
    connection.close()  #  this is where i kill the database connection VERY IMPORTANT
    # This will force django to open a new unique connection, since on linux at least
    # Connections do not fare well when forked 
    #...etc

ScheduledJob.py

from django.db import connection

def run_task(request, job_id):
    """ Just a simple view that when hit with a specific job id kicks of said job """ 
    # your logic goes here
    # ...
    processor = multiprocessing.Queue()
    multiprocessing.Process(
        target=call_command,  # all of our tasks are setup as management commands in django
        args=[
            job_info.management_command,
        ],
        kwargs= {
            'web_processor': processor,
        }.items() + vars(options).items()).start()

result = processor.get(timeout=10)  # wait to get a response on a successful init
# Result is a tuple of [TRUE|FALSE,<ErrorMessage>]
if not result[0]:
    raise Exception(result[1])
else:
   # THE VERY VERY IMPORTANT PART HERE, notice that up to this point we haven't touched the db again, but now we absolutely have to call connection.close()
   connection.close()
   # we do some database accessing here to get the most recently updated job id in the database

Szczerze mówiąc, aby zapobiec sytuacjom wyścigu (z wieloma jednoczesnymi użytkownikami), najlepiej byłoby wywołać bazę danych database.close () tak szybko, jak to możliwe po rozwidleniu procesu. Jednak nadal może istnieć szansa, że ​​inny użytkownik gdzieś w dół całkowicie wyśle ​​żądanie do bazy danych, zanim będziesz miał szansę opróżnić bazę danych.

Szczerze mówiąc, byłoby prawdopodobnie bezpieczniejsze i mądrzejsze, gdyby widelec nie wywoływał polecenia bezpośrednio, ale zamiast tego wywoływał skrypt w systemie operacyjnym, aby uruchomione zadanie działało we własnej powłoce django!


Skorzystałem z twojego pomysłu na zamknięcie wewnątrz widelca zamiast wcześniej, do stworzenia dekoratora, który dodaję do moich funkcji roboczych.
Rebs

1

Możesz przekazać więcej zasobów Postgre, w Debianie / Ubuntu możesz edytować:

nano /etc/postgresql/9.4/main/postgresql.conf

zastępując 9.4 twoją wersją postgre.

Oto kilka przydatnych wierszy, które należy zaktualizować przykładowymi wartościami, aby to zrobić, nazwy mówią same za siebie:

max_connections=100
shared_buffers = 3000MB
temp_buffers = 800MB
effective_io_concurrency = 300
max_worker_processes = 80

Uważaj, aby nie zwiększyć zbyt mocno tych parametrów, ponieważ może to prowadzić do błędów, gdy Postgre będzie próbował pobrać więcej zasobów niż jest dostępne. Powyższe przykłady działają dobrze na maszynie Debian 8GB Ram wyposażonej w 4 rdzenie.


0

Jeśli wszystko, czego potrzebujesz, to równoległość we / wy, a nie równoległość przetwarzania, możesz uniknąć tego problemu, przełączając procesy na wątki. Zastąpić

from multiprocessing import Process

z

from threading import Thread

ThreadObiekt ma tego samego interfejsu jakProcsess


0

Jeśli korzystasz również z puli połączeń, zadziałały następujące rozwiązania, które wymuszały zamykanie połączeń po rozwidleniu. Wcześniej nie wydawało się pomagać.

from django.db import connections
from django.db.utils import DEFAULT_DB_ALIAS

connections[DEFAULT_DB_ALIAS].dispose()
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.