Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
PyTorch — современная библиотека машинного обучения с открытым исходным кодом, разработанная компанией Facebook. Как и другие популярные библиотеки, такие как TensorFlow и Keras, PyTorch позволяет использовать вычислительную мощность видеокарт, автоматически составлять граф вычислений, дифференцировать и считать его. Но, в отличие от предыдущих библиотек, обладает более гибким функционалом, благодаря тому, что использует динамический граф вычислений.
Сейчас мы пройдем все этапы работы с библиотекой PyTorch. Мы затронем далеко не все возможности данной библиотеки, но их хватит, чтобы начать с ней работать. Научимся пользоваться инструментами для подготовки данных, которые делают загрузку данных легкой и уменьшают объем написанного кода. Создадим простую нейросеть, а также класс, который будет ее обучать и который можно будет применить для обучения любой модели, созданной в PyTorch. В конце мы визуализируем результат, чтобы оценить качество обученной модели.
Для начала загрузим нужные библиотеки:
import torch
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from sklearn.datasets import make_moons
from sklearn.model_selection import train_test_split
import copy
import datetime as dt
import pandas as pd
import numpy as np
Воспользуемся генератором датасета make_moons из библиотеки sklearn, который генерирует два класса в двумерном пространстве в виде полумесяца. Добавим немного шума при генерации (переменная noise)
X, y = make_moons(n_samples=150, random_state=33, noise=0.2)
Обернем данные в pandas.DataFrame для наглядности:
df = pd.DataFrame(X, columns=['x1', 'x2'])
df['target'] = y
df.head(5)
Раделим датасет на тренировочную и тестовую выборки с соотношением 6:4 при помощи функции train_test_split из библиотеки sklearn
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=33)
Визуализируем получившееся разделение:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12,5))
ax1.scatter(X_train[:,0][y_train == 0], X_train[:,1][y_train == 0], c='red', marker='o', edgecolors = 'black', alpha = 0.6)
ax1.scatter(X_train[:,0][y_train == 1], X_train[:,1][y_train == 1], c='green', marker = 'o', edgecolors = 'black', alpha = 0.6)
ax1.set_title('Train', fontsize=20)
ax2.scatter(X_test[:,0][y_test == 0], X_test[:,1][y_test == 0], c='red', marker='o', edgecolors = 'black', alpha = 0.6)
ax2.scatter(X_test[:,0][y_test == 1], X_test[:,1][y_test == 1], c='green', marker = 'o', edgecolors = 'black', alpha = 0.6)
ax2.set_title('Test', fontsize=20);
Создадим класс MyDataset, который наследуется от класса torch.utils.data.Dataset. Переопределим методы __len и __getitem, чтобы при вызове соответствующих методов мы получали объем выборки и пару значение-метка.
class MyDataset(Dataset):
def __init__(self, X, y):
self.X = torch.Tensor(X)
self.y = torch.from_numpy(y).float()
def __len__(self):
return self.X.shape[0]
def __getitem__(self, index):
return (self.X[index], self.y[index])
Сделаем нашу игрушечную нейросеть с тремя полно связными слоями. Между слоями поместим функции активации ReLU, а в третьем слое — сигмоиду.Первый слой будет принимать на вход значение x1 и x2 и на выходе давать 50 значений. Второй будет принимать 50 значений из первого слоя и подавать 50 значений на третий слой. А уже третий слой будет сжимать эти 50 значений до одного в интервале от 0 до 1 (это будет вероятность отнесения наблюдения к классу «1»).
import torch.nn as nn
ReLU = nn.ReLU()
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(2, 50)
self.fc2 = nn.Linear(50, 50)
self.fc3 = nn.Linear(50, 1)
def forward(self, x):
x = ReLU(self.fc1(x))
x = ReLU(self.fc2(x))
x = torch.sigmoid(self.fc3(x))
return x.view(-1)
Создадим экземпляр нашей нейросети и выведем ее структуру. Мы увидим количество слоев, количество параметров на входе и выходе у каждого слоя и наличие\отсутствие свободного члена.
net = Net()
print(net)
Net(
(fc1): Linear(in_features=2, out_features=50, bias=True)
(fc2): Linear(in_features=50, out_features=50, bias=True)
(fc3): Linear(in_features=50, out_features=1, bias=True)
)
Теперь создадим класс, который будет похож на модели, реализованные в библиотеке sklearn. Он будет предобрабатывать данные, тренировать нашу нейросеть, запоминать ход обучения и сохранять самую успешную попытку. Описание всех параметров находится внутри класса.
In [9]:
class Trainer():
"""
Parameters:
dataset: пользовательский класс, предобрабатывающий данные
loss_f: функция потерь
learning_rate: величина градиентного шага
epoch_amount: общее количество эпох
batch_size: размер одного бача
max_batches_per_epoch: максимальное количество бачей,
подаваемых в модель в одну эпоху
device: устройство для вычислений
early_stopping: количество эпох без улучшений до остановки обучения
optim: оптимизатор
scheduler: регулятор градиентного шага
permutate: перемешивание тренировочной выборки перед обучением
Attributes:
start_model: необученная модель
best_model: модель, после обучения
train_loss: средние значения функции потерь на тренировочных
данных в каждой эпохе
val_loss: средние значения функции потерь на валидационных
данных в каждой эпохе
Methods:
fit: обучение модели
predict: возвращает предсказание обученной моделью
"""
def __init__(self, dataset, net, loss_f, learning_rate=1e-3,
epoch_amount=10, batch_size=12,
max_batches_per_epoch=None,
device='cpu', early_stopping=10,
optim=torch.optim.Adam,
scheduler=None, permutate=True):
self.loss_f = loss_f
self.learning_rate = learning_rate
self.epoch_amount = epoch_amount
self.batch_size = batch_size
self.max_batches_per_epoch = max_batches_per_epoch
self.device = device
self.early_stopping = early_stopping
self.optim = optim
self.scheduler = scheduler
self.permutate = permutate
self.dataset = dataset
self.start_model = net
self.best_model = net
self.train_loss = []
self.val_loss = []
def predict(self, X):
return self.best_model(X)
def fit(self, X_train, X_test, y_train, y_test):
Net = self.start_model
device = torch.device(self.device)
Net.to(self.device)
optimizer = self.optim(Net.parameters(), lr=self.learning_rate)
if self.scheduler is not None:
scheduler = self.scheduler(optimizer)
train = self.dataset(X_train, y_train)
val = self.dataset(X_test, y_test)
train = DataLoader(train, batch_size=self.batch_size, shuffle=self.permutate)
val = DataLoader(val, batch_size=self.batch_size, shuffle=False)
best_val_loss = float('inf') # Лучшее значение функции потерь на валидационной выборке
# функции потерь на валидационной выборке
best_ep = 0 # Эпоха, на которой достигалось лучшее
# значение функции потерь на валидационной выборке
for epoch in range(self.epoch_amount):
start = dt.datetime.now()
print(f'Эпоха: {epoch}', end=' ')
Net.train()
mean_loss = 0
batch_n = 0
for batch_X, target in train:
if self.max_batches_per_epoch is not None:
if batch_n >= self.max_batches_per_epoch:
break
optimizer.zero_grad()
batch_X = batch_X.to(self.device)
target = target.to(self.device)
predicted_values = Net(batch_X)
loss = self.loss_f(predicted_values, target)
loss.backward()
optimizer.step()
mean_loss += float(loss)
batch_n += 1
mean_loss /= batch_n
self.train_loss.append(mean_loss)
print(f'Loss_train: {mean_loss}, {dt.datetime.now() - start} сек')
Net.eval()
mean_loss = 0
batch_n = 0
with torch.no_grad():
for batch_X, target in val:
if self.max_batches_per_epoch is not None:
if batch_n >= self.max_batches_per_epoch:
break
batch_X = batch_X.to(self.device)
target = target.to(self.device)
predicted_values = Net(batch_X)
loss = self.loss_f(predicted_values, target)
mean_loss += float(loss)
batch_n += 1
mean_loss /= batch_n
self.val_loss.append(mean_loss)
print(f'Loss_val: {mean_loss}')
if mean_loss < best_val_loss:
self.best_model = Net
best_val_loss = mean_loss
best_ep = epoch
elif epoch - best_ep > self.early_stopping:
print(f'{self.early_stopping} без улучшений. Прекращаем обучение...')
break
if self.scheduler is not None:
scheduler.step()
print()
Прежде всего мы запишем все параметры обучения нашей модели в атрибуты класса внутри метода __init__. Также инициализируем два атрибута train_loss и val_loss, куда будем записывать значение функции потерь на тренировочной и валидационной выборке.
Затем создаем метод predict для удобного использования обученной модели. После создаем метод fit, который будет производить обучение нашей модели. На вход он будет принимать данные для тренировки и валидации. Внутри него помещаем нашу модель на то устройство, на котором собираемся производить вычисления (‘cpu’ — для процессора или ‘cuda:*’, где * номер вашей видеокарты. Это самые распространенные устройства, могут быть и другие).
Далее указываем оптимизатор torch.optim, который будет делать градиентные шаги. Если пользователь ничего не выбрал, будет использоваться torch.optim.Adam. Также пользователь может настроить изменение градиентных шагов в ходе обучения с помощью torch.optim.lr_scheduler, но это не обязательно.
После этого помещаем данные для тренировочной и валидационной выборки в класс, который создали ранее (в нашем случае это MyDataset).
Затем подаем наши подготовленные данные в torch.utils.data.DataLoader, указывая размер бача (сколько данных будет подаваться в модель за один раз). Классы torch.utils.data.DataLoader и torch.Tensor torch.utils.data.Dataset (от которого мы унаследовали класс MyDataset) служат для упрощения и ускорения загрузки данных и экономии памяти.
Подготовив данные и создав необходимые переменные, мы начинаем процесс обучения. Для этого переводим нашу модель в режим train. В этом состоянии будут работать различные методы регуляризации (батч-нормализация, дропаут и т.д). В нашем примере для простоты мы никаких методов регуляризации в нашу модель не добавили.
Внутри второго цикла мы перебираем батчи, которые нам отдает итератор train_dataloader. Каждый батч перемещаем на то устройство, которое мы выбрали в начале. Далее обнуляем градиент у оптимизатора, т.к. в PyTorch по умолчанию градиент будет накапливаться после каждой итерации. После этого делаем основные три действия: совершаем прямой проход по нейросети, считаем значение для функции потерь и на основе этого делаем градиентный шаг. Выводим значение функции потерь на тренировочной выборке и время, потраченное на обучение внутри одной эпохи.
Затем мы считаем значении функции потерь на валидационной выборке. Для этого мы переводим модель в режим eval и отключаем расчет градиента. После этого делаем то же, что и при обучении, но без градиентного шага. Результат функции потерь также выводим на экран. Если результат функции потерь на валидационной выборке оказался лучше, чем был, то мы сохраняем текущую модель в переменную best_model.
Все эти шаги повторяются столько раз, сколько мы указали в epoch_amount, либо до тех пор пока значение функции потерь на валидационной выборке не перестанет изменяться (если указан параметр early_stopping).
На выходе мы получим модель, которая лучшим образом показала себя на валидационной выборке, значение функции потерь на этой модели и два списка со значением функции потерь в ходе обучения на тренировочных и валидационных данных.
Теперь настало время опробовать класс на нашем датасете. В качестве функции потерь выберем бинарную кросс-энтропию torch.nn.BCELoss, а в качестве оптимизатора — стохастический градиентный спуск torch.optim.SGD.
Запишем все выбранные параметры в словарь и подадим его методу fit.
params = {
'dataset': MyDataset,
'net': net,
'epoch_amount': 1000,
'learning_rate': 1e-2,
'early_stopping': 25,
'loss_f': nn.BCELoss(),
'optim': torch.optim.SGD,
}
clf = Trainer(**params)
clf.fit(X_train, X_test, y_train, y_test)
Осталось визуализировать полученный результат.
Построим график изменения значения функции потерь на тренировочной и валидационной выборке от количества эпох.
def plot_loss(Loss_train, Loss_val):
plt.figure(figsize=(12, 5))
plt.plot(range(len(Loss_train)), Loss_train, color='orange', label='train', linestyle='--')
plt.plot(range(len(Loss_val)), Loss_val, color='blue', marker='o', label='val')
plt.legend()
plt.show()
plot_loss(clf.train_loss, clf.val_loss)
Визуализируем границу разделения.
def make_meshgrid(x1, x2, h=.02):
x1_min, x1_max = x1.min() - 2, x1.max() + 2
x2_min, x2_max = x2.min() - 2, x2.max() + 2
xx1, xx2 = np.meshgrid(np.arange(x1_min, x1_max, h), np.arange(x2_min, x2_max, h))
return xx1, xx2
def plot_contours(ax, xx1, xx2, **params):
C = clf.predict(torch.Tensor(np.c_[xx1.ravel(), xx2.ravel()])).detach().numpy()
C = C.reshape(xx1.shape)
out = ax.contourf(xx1, xx2, C, **params)
return out
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12,5))
xx1, xx2 = make_meshgrid(X[0], X[1])
ax1.scatter(X_train[:,0][y_train == 0], X_train[:,1][y_train == 0], c='red', marker='o', edgecolors = 'black', alpha = 0.6)
ax1.scatter(X_train[:,0][y_train == 1], X_train[:,1][y_train == 1], c='green', marker = 'o', edgecolors = 'black', alpha = 0.6)
ax1.set_title('Train', fontsize=20)
plot_contours(ax1, xx1, xx2, alpha=0.2)
ax2.scatter(X_test[:,0][y_test == 0], X_test[:,1][y_test == 0], c='red', marker='o', edgecolors = 'black', alpha = 0.6)
ax2.scatter(X_test[:,0][y_test == 1], X_test[:,1][y_test == 1], c='green', marker = 'o', edgecolors = 'black', alpha = 0.6)
ax2.set_title('Test', fontsize=20)
plot_contours(ax2, xx1, xx2, alpha=0.2);
Желтым цветом показана область, где вероятность встретить точку из класса «1» максимальна. Соответственно, для точки из класса «0» вероятность максимальна в красной области.Как мы видим, граница разделения получилась нелинейной, т.к. в нейросетях используются нелинейные функции активации.