Jak UPSERT (POŁĄCZ, WSTAW… W DUPLIKATOWEJ AKTUALIZACJI) w PostgreSQL?


267

Bardzo często zadawanym pytaniem jest tutaj, jak zrobić upsert, czyli to, co wywołuje MySQL, INSERT ... ON DUPLICATE UPDATEa standardowe obsługuje jako częśćMERGE operacji.

Biorąc pod uwagę, że PostgreSQL nie obsługuje go bezpośrednio (przed pg 9.5), jak to zrobić? Rozważ następujące:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

Teraz wyobraź sobie, że chcesz „upsert” krotek (2, 'Joe'), (3, 'Alan')tak nowe zawartość tabeli będzie:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

O tym ludzie rozmawiają podczas dyskusji upsert. Co najważniejsze, każde podejście musi być bezpieczne w obecności wielu transakcji pracujących przy tym samym stole - albo przez jawne blokowanie, albo w inny sposób bronić się przed wynikowymi warunkami wyścigu.

Ten temat jest obszernie omawiany na stronie Insert, przy zduplikowanej aktualizacji w PostgreSQL?, ale chodzi o alternatywy dla składni MySQL, az czasem pojawiło się sporo niepowiązanych szczegółów. Pracuję nad ostatecznymi odpowiedziami.

Techniki te są również przydatne do „wstaw, jeśli nie istnieje, w przeciwnym razie nic nie rób”, tj. „Wstaw ... przy zduplikowanym kluczu ignoruj”.



8
@MichaelHampton celem tutaj było stworzenie ostatecznej wersji, która nie byłaby mylona przez wiele nieaktualnych odpowiedzi - i zablokowana, aby nikt nie mógł nic na to poradzić. Nie zgadzam się z głosowaniem z bliska.
Craig Ringer

Dlaczego, to wkrótce stanie się przestarzałe - i zablokowane, aby nikt nie mógł nic na to poradzić.
Michael Hampton,

2
@MichaelHampton Jeśli jesteś zaniepokojony, być może mógłbyś oflagować ten, z którym się łączysz i poprosić o jego odblokowanie, aby można go było wyczyścić, a następnie możemy to scalić. Mam dość jedynego oczywistego zbliżenia as-dup za upsert będący tak mylącym i złym bałaganem.
Craig Ringer

1
Pytania i odpowiedzi nie są zablokowane!
Michael Hampton,

Odpowiedzi:


396

9.5 i nowsze:

PostgreSQL 9.5 i nowsze wsparcie INSERT ... ON CONFLICT UPDATE(i ON CONFLICT DO NOTHING), tj. Upsert.

Porównanie zON DUPLICATE KEY UPDATE .

Szybkie wyjaśnienie .

Aby zapoznać się z użytkowaniem, zobacz instrukcję - w szczególności klauzulę o konfliktach działań na schemacie składni oraz tekst wyjaśniający .

W przeciwieństwie do rozwiązań dla wersji 9.4 i starszych, które podano poniżej, ta funkcja działa z wieloma sprzecznymi wierszami i nie wymaga wyłącznego blokowania ani ponownej próby.

Zatwierdzenie dodające funkcję jest tutaj, a dyskusja wokół jej rozwoju jest tutaj .


Jeśli korzystasz z wersji 9.5 i nie musisz być kompatybilny wstecz, możesz teraz przestać czytać .


9.4 i starsze:

PostgreSQL nie ma żadnej wbudowanej UPSERT(lub MERGE) funkcji, a robienie tego skutecznie w obliczu równoczesnego użycia jest bardzo trudne.

W tym artykule szczegółowo omówiono problem .

Ogólnie rzecz biorąc, musisz wybrać jedną z dwóch opcji:

  • Indywidualne operacje wstawiania / aktualizacji w pętli ponownych prób; lub
  • Blokowanie stołu i łączenie partii

Pętla ponawiania pojedynczych wierszy

Użycie pojedynczych poprawek wierszy w pętli ponownej próby jest rozsądną opcją, jeśli chcesz, aby wiele połączeń jednocześnie próbowało wykonać wstawienia.

Dokumentacja PostgreSQL zawiera przydatną procedurę, która pozwoli ci to zrobić w pętli wewnątrz bazy danych . Chroni przed utraconymi aktualizacjami i wstawia rasy, w przeciwieństwie do większości naiwnych rozwiązań. Działa tylko w READ COMMITTEDtrybie i jest bezpieczny tylko wtedy, gdy jest to jedyna rzecz, którą robisz w transakcji. Funkcja nie będzie działać poprawnie, jeśli wyzwalacze lub dodatkowe unikalne klucze powodują unikalne naruszenia.

Ta strategia jest bardzo nieefektywna. Kiedy tylko jest to praktyczne, powinieneś ustawiać w kolejce prace i wykonać zbiorczy upsert, jak opisano poniżej.

Wiele prób rozwiązania tego problemu nie uwzględnia wycofania, więc powodują niekompletne aktualizacje. Dwie transakcje ścigają się ze sobą; jeden z nich pomyślnie INSERT; drugi otrzymuje duplikat błędu klucza i UPDATEzamiast tego robi błąd . Te UPDATEbloki czekają na INSERTwycofywania lub zatwierdzenia. Kiedy się wycofuje, UPDATEwarunek ponownego sprawdzania dopasowuje zero wierszy, więc nawet jeśli UPDATEzatwierdzenia nie wykonały oczekiwanego upsert. Musisz sprawdzić liczbę wierszy wyników i spróbować ponownie w razie potrzeby.

Niektóre próby rozwiązania nie uwzględniają również wyścigów SELECT. Jeśli spróbujesz oczywistego i prostego:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

wtedy, gdy dwa działają jednocześnie, istnieje kilka trybów awarii. Jednym z nich jest już omówiony problem z ponownym sprawdzaniem aktualizacji. Innym jest, gdy oba UPDATEjednocześnie, dopasowując zero wierszy i kontynuując. Potem oboje zrobić EXISTStest, który dzieje się przedINSERT . Oba mają zero wierszy, więc oba robią INSERT. Jeden kończy się niepowodzeniem z duplikatem błędu klucza.

Dlatego potrzebujesz pętli ponownej próby. Możesz pomyśleć, że możesz zapobiec zduplikowanym kluczowym błędom lub utracie aktualizacji dzięki sprytnemu SQL, ale nie możesz. Musisz sprawdzić liczbę wierszy lub obsłużyć zduplikowane błędy klucza (w zależności od wybranego podejścia) i spróbować ponownie.

Nie rzucaj na to własnym rozwiązaniem. Podobnie jak w przypadku kolejkowania wiadomości, prawdopodobnie jest to złe.

Masywny upsert z zamkiem

Czasami chcesz zrobić zbiorczy upsert, w którym masz nowy zestaw danych, który chcesz scalić ze starszym istniejącym zestawem danych. Jest to znacznie bardziej wydajne niż wstawianie pojedynczych rzędów i powinno być preferowane, gdy jest to praktyczne.

W takim przypadku zazwyczaj postępujesz według następującego procesu:

  • CREATETEMPORARYstół

  • COPY lub wstaw zbiorczo nowe dane do tabeli temp

  • LOCKtabela docelowa IN EXCLUSIVE MODE. Pozwala to na inne transakcje SELECT, ale nie wprowadza żadnych zmian w tabeli.

  • Wykonaj jeden UPDATE ... FROMz istniejących rekordów, używając wartości z tabeli temp;

  • Wykonaj jeden INSERTz wierszy, które jeszcze nie istnieją w tabeli docelowej;

  • COMMIT, zwalniając blokadę.

Na przykład w przykładzie podanym w pytaniu, używając wielowartościowego INSERTdo wypełnienia tabeli temp:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

Powiązane czytanie

Co MERGE?

Standard SQL MERGEma właściwie źle zdefiniowaną semantykę współbieżności i nie nadaje się do upserowania bez uprzedniego zablokowania tabeli.

To naprawdę przydatna instrukcja OLAP do łączenia danych, ale tak naprawdę nie jest to przydatne rozwiązanie dla bezpiecznego współbieżności. Istnieje wiele porad dla osób korzystających z innych DBMS do używania MERGEupserts, ale tak naprawdę jest to złe.

Inne bazy danych:


Czy w zbiorczej poprawce jest możliwa wartość usuwania z nowych wartości zamiast filtrowania WSTAWIANIA? Np. Z aktualizacją JAK (AKTUALIZACJA ... POWRÓT newvals.id) USUŃ Z newvals ZA POMOCĄ upd GDZIE newvals.id = upd.id, a następnie pusty WSTAWIĆ do testowalnego WYBIERZ * Z nowości? Mój pomysł z tym: zamiast filtrować dwa razy w INSERT (dla JOIN / WHERE i dla unikalnego ograniczenia), użyj ponownie wyników kontroli istnienia z UPDATE, które są już w pamięci RAM i mogą być znacznie mniejsze. Może to być wygrana, jeśli kilka pasujących rzędów i / lub nowe wartości są znacznie mniejsze niż w tabeli testowej.
Gunnlaugur Briem

1
Nadal istnieją nierozwiązane problemy, a dla innych dostawców nie jest jasne, co działa, a co nie. 1. Jak wspomniano, zapętlenie Postgres nie działa w przypadku wielu unikalnych kluczy. 2. Duplikat klucza dla mysql również nie działa dla wielu unikalnych kluczy. 3. Czy inne rozwiązania dla MySQL, SQL Server i Oracle przedstawione powyżej działają? Czy w takich przypadkach możliwe są wyjątki i czy musimy zapętlać?
dan b

@danb To tak naprawdę dotyczy tylko PostgreSQL. Nie ma rozwiązania dla różnych dostawców. Rozwiązanie dla PostgreSQL nie działa dla wielu wierszy, niestety musisz wykonać jedną transakcję na wiersz. Jak MERGEwspomniano powyżej, „rozwiązania” stosowane w SQL Server i Oracle są niepoprawne i podatne na warunki wyścigowe. Musisz dokładnie przyjrzeć się każdemu DBMS, aby dowiedzieć się, jak sobie z nimi poradzić. Naprawdę mogę zaoferować tylko porady na temat PostgreSQL. Jedynym sposobem na wykonanie bezpiecznego wielowierszowego uaktualnienia na PostgreSQL jest dodanie obsługi macierzystego uaktualnienia do serwera głównego.
Craig Ringer

Nawet w przypadku PostGresQL rozwiązanie nie działa w przypadku, gdy tabela ma wiele unikalnych kluczy (aktualizacja tylko jednego wiersza). W takim przypadku musisz określić, który klucz jest aktualizowany. Może istnieć rozwiązanie dla różnych dostawców korzystające na przykład z jdbc.
dan b

2
Postgres obsługuje teraz UPSERT - git.postgresql.org/gitweb/…
Chris

32

Próbuję wnieść wkład w inne rozwiązanie problemu pojedynczego wstawiania w wersjach PostgreSQL wcześniejszych niż 9.5. Chodzi o to, aby po prostu spróbować wykonać najpierw wstawienie, a jeśli rekord już istnieje, zaktualizuj go:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

Pamiętaj, że to rozwiązanie można zastosować tylko wtedy, gdy nie zostaną usunięte wiersze tabeli .

Nie wiem o skuteczności tego rozwiązania, ale wydaje mi się to dość rozsądne.


3
Dziękuję, właśnie tego szukałem. Nie mogę zrozumieć, dlaczego tak trudno było je znaleźć.
isapir

4
Tak. Uproszczenie to działa tylko wtedy, gdy nie ma żadnych usunięć.
Craig Ringer

@CraigRinger Czy możesz wyjaśnić, co dokładnie stanie się, jeśli zostaną usunięte?
turbanoff

@turbanoff Wstawianie może się nie powieść, ponieważ rekord już tam jest, a następnie jest usuwany jednocześnie, a aktualizacja wpływa na zero wierszy, ponieważ wiersz został usunięty.
Craig Ringer

@CraigRinger So. Usuwanie odbywa się jednocześnie . Jakie są możliwe outways jeśli to jest działa dobrze? Jeśli usuwanie działa jednocześnie - wówczas można je wykonać tuż po naszym bloku. Co próbuję powiedzieć - jeśli jednocześnie usuwamy - ten kod działa tak samo jak właściwyinsert on update
turbanoff

28

Oto kilka przykładów insert ... on conflict ...( str. 9.5+ ):

  • Wstaw w razie konfliktu - nie rób nic .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`  
  • Wstaw, w przypadku konfliktu - wykonaj aktualizację , określ konflikt docelowy za pomocą kolumny .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;  
  • Wstaw, w przypadku konfliktu - wykonaj aktualizację , określ cel konfliktu za pomocą nazwy ograniczenia .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;

świetna odpowiedź - pytanie: dlaczego lub w jakiej sytuacji należy zastosować specyfikację celu poprzez nazwę kolumny lub ograniczenia? Czy istnieją zalety / wady różnych przypadków użycia?
Nathan Benton

1
@NathanBenton Myślę, że istnieją co najmniej 2 różnice: (1) nazwa kolumny jest określona przez programistę, podczas gdy nazwa ograniczenia może być określona przez programistę lub wygenerowana przez bazę danych zgodnie z nazwami tabel / kolumn. (2) każda kolumna może mieć wiele ograniczeń. To powiedziawszy, to zależy od twojego wyboru, którego użyć.
Eric Wang

8

Wprowadzenie SQLAlchemy dla Postgres> = 9.5

Ponieważ powyższy duży post obejmuje wiele różnych podejść SQL do wersji Postgres (nie tylko nie-9.5 jak w pytaniu), chciałbym dodać, jak to zrobić w SQLAlchemy, jeśli używasz Postgres 9.5. Zamiast implementować własny upsert, możesz także użyć funkcji SQLAlchemy (które zostały dodane w SQLAlchemy 1.1). Osobiście polecam korzystanie z nich, jeśli to możliwe. Nie tylko ze względu na wygodę, ale także dlatego, że pozwala PostgreSQLowi obsługiwać wszelkie warunki wyścigowe, które mogą wystąpić.

Cross-posting z innej odpowiedzi, którą wczoraj podałem ( https://stackoverflow.com/a/44395983/2156909 )

SQLAlchemy obsługuje ON CONFLICTteraz dwie metody on_conflict_do_update()i on_conflict_do_nothing():

Kopiowanie z dokumentacji:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert


4
Python i SQLAlchemy nie są wymienione w pytaniu.
Alexander Emelianov

Często używam Pythona w pisanych przeze mnie rozwiązaniach. Ale nie zajrzałem do SQLAlchemy (lub byłem tego świadomy). To wydaje się elegancką opcją. Dziękuję Ci. Jeśli się sprawdzi, przedstawię to mojej organizacji.
Robert,

3
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

Testowane na Postgresql 9.3


@CraigRinger: czy mógłbyś rozwinąć tę kwestię? Cte nie jest atomowy?
Paryż

2
@parisni Nie. Każdy termin CTE otrzymuje własną migawkę, jeśli wykonuje zapisy. Nie ma też żadnego rodzaju blokowania predykatów w wierszach, które nie zostały znalezione, dzięki czemu można je tworzyć jednocześnie przez inną sesję. Jeśli użyjesz SERIALIZABLEizolacji, przerwiesz z niepowodzeniem serializacji, w przeciwnym razie prawdopodobnie dostaniesz wyjątkowe naruszenie. Nie wymyślaj na nowo, ponowne wymyślenie będzie błędne. Zastosowanie INSERT ... ON CONFLICT .... Jeśli twój PostgreSQL jest za stary, zaktualizuj go.
Craig Ringer

@CraigRinger INSERT ... ON CLONFLICT ...nie jest przeznaczony do masowego ładowania. Z twojego postu LOCK TABLE testtable IN EXCLUSIVE MODE;wewnątrz CTE jest obejście, które pozwala uzyskać rzeczy atomowe. Nie
Paryż

@parisni Nie jest przeznaczony do masowego załadunku? Mówi kto postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT . Oczywiście, jest znacznie wolniejszy niż ładowanie zbiorcze bez zachowania przypominającego upsert, ale jest to oczywiste i tak będzie bez względu na to, co robisz. Jest to o wiele szybsze niż stosowanie podtransakcji, to na pewno. Najszybszym podejściem jest oczywiście zablokowanie tabeli docelowej, a następnie zrobienie czegoś insert ... where not exists ...podobnego lub podobnego.
Craig Ringer

1

Ponieważ to pytanie zostało zamknięte, piszę tutaj o tym, jak to zrobić za pomocą SQLAlchemy. Poprzez rekurencję ponawia próbę włożenia lub aktualizacji zbiorczej w celu zwalczania warunków wyścigu i błędów walidacji.

Najpierw import

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

Teraz działa kilka pomocników

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

I wreszcie funkcja upsert

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

Oto jak z niego korzystasz

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

Zaletą tego jest bulk_save_objectsto, że może obsługiwać relacje, sprawdzanie błędów itp. Podczas wstawiania (w przeciwieństwie do operacji masowych ).


Dla mnie też wygląda to źle. Co się stanie, jeśli sesja równoległa wstawi wiersz po zebraniu listy identyfikatorów? Lub usuwa jeden?
Craig Ringer

dobra uwaga @CraigRinger Robię coś podobnego do tego, ale wykonuję tylko jedną sesję. Jaki jest zatem najlepszy sposób obsługi wielu sesji? Może transakcja?
reubano

Transakcje nie są magicznym rozwiązaniem wszystkich problemów dotyczących współbieżności. Możesz używać SERIALIZABLE transakcji i obsługiwać błędy serializacji, ale jest to powolne. Potrzebujesz obsługi błędów i ponownej próby. Zobacz moją odpowiedź i sekcję „powiązane czytanie”.
Craig Ringer

@CraigRinger gotcha. W rzeczywistości zaimplementowałem pętlę ponownych prób w moim przypadku z powodu innych błędów sprawdzania poprawności. Zaktualizuję odpowiednio tę odpowiedź.
reubano
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.