Jeśli nowe kategorie pojawiają się bardzo rzadko, ja osobiście wolę rozwiązanie „jeden kontra wszystkie” dostarczone przez @oW_ . Dla każdej nowej kategorii trenujesz nowy model na X liczbie próbek z nowej kategorii (klasa 1) i X liczbie próbek z pozostałych kategorii (klasa 0).
Jeśli jednak często pojawiają się nowe kategorie i chcesz skorzystać z jednego wspólnego modelu, istnieje sposób na osiągnięcie tego za pomocą sieci neuronowych.
Podsumowując, po nadejściu nowej kategorii dodajemy odpowiedni nowy węzeł do warstwy softmax o zerowych (lub losowych) wagach i utrzymujemy nietknięte stare wagi, a następnie trenujemy model rozszerzony o nowe dane. Oto wizualny szkic pomysłu (narysowany przeze mnie):
Oto implementacja pełnego scenariusza:
Model jest szkolony w dwóch kategoriach,
Nadchodzi nowa kategoria,
Formaty modeli i docelowych są odpowiednio aktualizowane,
Model jest szkolony na nowych danych.
Kod:
from keras import Model
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from sklearn.metrics import f1_score
import numpy as np
# Add a new node to the last place in Softmax layer
def add_category(model, pre_soft_layer, soft_layer, new_layer_name, random_seed=None):
weights = model.get_layer(soft_layer).get_weights()
category_count = len(weights)
# set 0 weight and negative bias for new category
# to let softmax output a low value for new category before any training
# kernel (old + new)
weights[0] = np.concatenate((weights[0], np.zeros((weights[0].shape[0], 1))), axis=1)
# bias (old + new)
weights[1] = np.concatenate((weights[1], [-1]), axis=0)
# New softmax layer
softmax_input = model.get_layer(pre_soft_layer).output
sotfmax = Dense(category_count + 1, activation='softmax', name=new_layer_name)(softmax_input)
model = Model(inputs=model.input, outputs=sotfmax)
# Set the weights for the new softmax layer
model.get_layer(new_layer_name).set_weights(weights)
return model
# Generate data for the given category sizes and centers
def generate_data(sizes, centers, label_noise=0.01):
Xs = []
Ys = []
category_count = len(sizes)
indices = range(0, category_count)
for category_index, size, center in zip(indices, sizes, centers):
X = np.random.multivariate_normal(center, np.identity(len(center)), size)
# Smooth [1.0, 0.0, 0.0] to [0.99, 0.005, 0.005]
y = np.full((size, category_count), fill_value=label_noise/(category_count - 1))
y[:, category_index] = 1 - label_noise
Xs.append(X)
Ys.append(y)
Xs = np.vstack(Xs)
Ys = np.vstack(Ys)
# shuffle data points
p = np.random.permutation(len(Xs))
Xs = Xs[p]
Ys = Ys[p]
return Xs, Ys
def f1(model, X, y):
y_true = y.argmax(1)
y_pred = model.predict(X).argmax(1)
return f1_score(y_true, y_pred, average='micro')
seed = 12345
verbose = 0
np.random.seed(seed)
model = Sequential()
model.add(Dense(5, input_shape=(2,), activation='tanh', name='pre_soft_layer'))
model.add(Dense(2, input_shape=(2,), activation='softmax', name='soft_layer'))
model.compile(loss='categorical_crossentropy', optimizer=Adam())
# In 2D feature space,
# first category is clustered around (-2, 0),
# second category around (0, 2), and third category around (2, 0)
X, y = generate_data([1000, 1000], [[-2, 0], [0, 2]])
print('y shape:', y.shape)
# Train the model
model.fit(X, y, epochs=10, verbose=verbose)
# Test the model
X_test, y_test = generate_data([200, 200], [[-2, 0], [0, 2]])
print('model f1 on 2 categories:', f1(model, X_test, y_test))
# New (third) category arrives
X, y = generate_data([1000, 1000, 1000], [[-2, 0], [0, 2], [2, 0]])
print('y shape:', y.shape)
# Extend the softmax layer to accommodate the new category
model = add_category(model, 'pre_soft_layer', 'soft_layer', new_layer_name='soft_layer2')
model.compile(loss='categorical_crossentropy', optimizer=Adam())
# Test the extended model before training
X_test, y_test = generate_data([200, 200, 0], [[-2, 0], [0, 2], [2, 0]])
print('extended model f1 on 2 categories before training:', f1(model, X_test, y_test))
# Train the extended model
model.fit(X, y, epochs=10, verbose=verbose)
# Test the extended model on old and new categories separately
X_old, y_old = generate_data([200, 200, 0], [[-2, 0], [0, 2], [2, 0]])
X_new, y_new = generate_data([0, 0, 200], [[-2, 0], [0, 2], [2, 0]])
print('extended model f1 on two (old) categories:', f1(model, X_old, y_old))
print('extended model f1 on new category:', f1(model, X_new, y_new))
które wyjścia:
y shape: (2000, 2)
model f1 on 2 categories: 0.9275
y shape: (3000, 3)
extended model f1 on 2 categories before training: 0.8925
extended model f1 on two (old) categories: 0.88
extended model f1 on new category: 0.91
Powinienem wyjaśnić dwie kwestie dotyczące tego wyniku:
Wydajność modelu jest zmniejszana od 0.9275
do 0.8925
tylko przez dodanie nowego węzła. Jest tak, ponieważ dane wyjściowe nowego węzła są również uwzględniane przy wyborze kategorii. W praktyce dane wyjściowe nowego węzła należy uwzględnić dopiero po przeszkoleniu modelu na sporej próbce. Na przykład [0.15, 0.30, 0.55]
na tym etapie powinniśmy osiągnąć najwyższą z dwóch pierwszych pozycji , tj. 2. klasy.
Wydajność modelu rozszerzonego w dwóch (starych) kategoriach 0.88
jest mniejsza niż w starym modelu 0.9275
. Jest to normalne, ponieważ teraz model rozszerzony chce przypisać dane wejściowe do jednej z trzech kategorii zamiast dwóch. Ten spadek jest również oczekiwany, gdy wybieramy spośród trzech klasyfikatorów binarnych w porównaniu do dwóch klasyfikatorów binarnych w podejściu „jeden kontra wszyscy”.