Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Привет, Хабр!
Это одна из трех статей, в которых я (автор канала Зайцем по ХаХатонам) рассказываю о задачах Всеросийского чемпионата Цифрового Прорыва, объясняю базовые решения (baseline) и даю советы, которые помогут подняться выше по рейтингу. В данной статье будет рассмотрен кейс от НИИАС-РЖД, в котором необходимо разработать алгоритм определения железнодорожной колеи и подвижного состава для предотвращения чрезвычайных ситуаций на железной дороге.
Спойлер: в конце статьи есть советы для улучшения базового решения.
Цифровой Прорыв
Думаю, все и так знают, что такое Цифровой Прорыв. Однако, напомню, что в этом году основной тематикой стал искусственный интеллект. И сезон этого года в самом разгаре!
Хоть часть мероприятий уже прошла, впереди участников ждет ещё 19 региональных чемпионатов, 5 окружных хакатонов и 3 всероссийских чемпионата. Советую присоединиться ко мне и другим участникам, чтобы не упустить возможность выиграть денежные призы и крутые путешествия, а также набраться опыта на самых разных задачах.
Введение
Автоматизация функций управления и обеспечения безопасности за счет внедрения технических средств — всё это способы повышения уровня безопасности на железнодорожном транспорте.
Внедрение новой техники и технологий автоматизации позволяет исключить некоторые опасные технологические операции и значительно изменить характер работы многих сотрудников железной дороги. Внедрение блоков определения препятствий на основе видеоаналитики позволяет вести дополнительный визуальный контроль пространства перед поездом, определять путь и направление движения. Применение камер различного диапазона видимости и фокусного расстояния увеличивает возможности системы технического зрения и превышает возможности человека по скорости реакции, дальности определения препятствий, в том числе в сложных погодных условиях (ночь, дождь, снег, туман, задымленность, ослепление солнечным светом, переход из затемненных участков на освещенные).
Создание интеллектуальных систем, предупреждающих машиниста о возможном столкновении с потенциально опасными объектами, содержит в себе несколько первостепенных задач: определения колеи и подвижного состава.
Условие задачи
В рамках чемпионата требуется создать алгоритм, определяющий элементы дорожной инфраструктуры: колею (рельсошпальную решетку) и подвижной состав (локомотивы, грузовые вагоны, пассажирские вагоны).
Данные
train/images — папка, содержащая 8203 трехканальных RGB изображений для обучения.
train/mask — папка, содержащая 8203 масок той же размерности со сегментированной инфраструктурой .
test/— папка, содержащая 1000 фотографий для классификации;
Ожидаемое решение
На вход модели подается 1000 картинок, ответом должно стать 1000 масок изображений. Вам необходимо загрузить именно 1000, чтобы не было ошибки. Для корректной проверки требуется, чтобы найденные маски имели те же названия, что и оригинальные изображения с расширением “.png“
На что стоит обратить внимание
Стоит отметить, что изображения хоть и сделаны с одной точки и на постоянной высоте, при этом имеют большую размерность и сделаны в разных погодных условиях: ночью, в дождь, в снег, во время тумана, задымленности.
Метрика
В качестве метрики в задаче выступает индекс оценки семантической сегментации или же MIoU. Подробнее о метрике можно почитать тут.
Подробно о решении
Методология решения
Какие библиотеки нам нужны
В начале нужно установить все нестандартные модули.
# Catalyst
!pip install catalyst==20.12
# for augmentations
!pip install albumentations==0.4.3
# for pretrained segmentation models for PyTorch
!pip install segmentation-models-pytorch==0.1.0
# for TTA
!pip install ttach==0.0.2
# for tensorboard
!pip install tensorflow
# if Your machine support Apex FP16, uncomment this 3 lines below
# !git clone https://github.com/NVIDIA/apex
# !pip install -v --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" ./apex
# is_fp16_used = True
Далее импортируем библиотеки, которые понадобятся нам в решении. И фиксируем random_seed, чтобы результаты получились воспроизводимыми.
Из важных библиотек хочется отметить следующие:
torch - основная библиотека для создания моделей глубокого обучения (нейросетей)
catalyst - библиотека для упрощения и ускорения создания моделей
segmentation_models_pytorch - библиотека, в который собраны модели и функции для решения задач сегментации (разработана нашим соотечественником)
albumentations - библиотека для аугментации данных
from typing import Callable, List, Tuple
import random
import collections
import matplotlib.pyplot as plt
import numpy as np
from skimage.io import imread as gif_imread
from sklearn.model_selection import train_test_split
from pathlib import Path
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
from torch import optim
import catalyst
from catalyst import utils
from catalyst.contrib.nn import DiceLoss, IoULoss
from catalyst.contrib.nn import RAdam, Lookahead
from catalyst.dl import SupervisedRunner
from catalyst.dl import DiceCallback, IouCallback, \
CriterionCallback, MetricAggregationCallback
from catalyst.contrib.callbacks import DrawMasksCallback
import segmentation_models_pytorch as smp
import albumentations as albu
from albumentations.pytorch import ToTensor
print(f"torch: {torch.__version__}, catalyst: {catalyst.__version__}")
SEED = 42
utils.set_global_seed(SEED)
utils.prepare_cudnn(deterministic=True)
import warnings
warnings.filterwarnings("ignore")
Преобразование начального датасета
Объявим пути до данных и соберем пути до всех изображений и масок из тренировочной выборки.
ROOT = Path("/content/")
train_image_path = ROOT / "train"
train_mask_path = ROOT / "train_masks"
test_image_path = ROOT / "test"
ALL_IMAGES = sorted(train_image_path.glob("*.png"))
print('Images:', len(ALL_IMAGES))
ALL_MASKS = sorted(train_mask_path.glob("*.png"))
print('Masks:', len(ALL_MASKS))
Визуализация данных
Напишем функции для визуализации данных.
def show_examples(name: str, image: np.ndarray, mask: np.ndarray):
plt.figure(figsize=(10, 14))
plt.subplot(1, 2, 1)
plt.imshow(image)
plt.title(f"Image: {name}")
plt.subplot(1, 2, 2)
plt.imshow(mask)
plt.title(f"Mask: {name}")
def show(index: int, images: List[Path], masks: List[Path], transforms=None) -> None:
image_path = images[index]
name = image_path.name
image = utils.imread(image_path)
mask = gif_imread(masks[index])
if transforms is not None:
temp = transforms(image=image, mask=mask)
image = temp["image"]
mask = temp["mask"]
show_examples(name, image, mask)
def show_random(images: List[Path], masks: List[Path], transforms=None) -> None:
length = len(images)
index = random.randint(0, length - 1)
show(index, images, masks, transforms)
Визуализируем случайное изображение.
show_random(ALL_IMAGES, ALL_MASKS)
Аугментация входных данных
Аугментации нужны для того, чтобы добавить разнообразия в данные и снизить вероятность переобучения модели, при этом повысив качество ее работы на тестовом наборе данных.
В базовом решении используются такие аугментации, как повороты на 90 градусов, вырезки кусочком изображения, случайная яркость и контрастность, различные искажения и шумы, изменение цветовых характеристик, кропы.
def pre_transforms(image_size=224):
return [albu.Resize(image_size, image_size, p=1)]
def hard_transforms():
result = [
albu.RandomRotate90(),
albu.Cutout(),
albu.RandomBrightnessContrast(
brightness_limit=0.2, contrast_limit=0.2, p=0.3
),
albu.GridDistortion(p=0.3),
albu.HueSaturationValue(p=0.3)
]
return result
def resize_transforms(image_size=224):
BORDER_CONSTANT = 0
pre_size = int(image_size * 1.5)
random_crop = albu.Compose([
albu.SmallestMaxSize(pre_size, p=1),
albu.RandomCrop(
image_size, image_size, p=1
)
])
rescale = albu.Compose([albu.Resize(image_size, image_size, p=1)])
random_crop_big = albu.Compose([
albu.LongestMaxSize(pre_size, p=1),
albu.RandomCrop(
image_size, image_size, p=1
)
])
# Converts the image to a square of size image_size x image_size
result = [
albu.OneOf([
random_crop,
rescale,
random_crop_big
], p=1)
]
return result
def post_transforms():
# we use ImageNet image normalization
# and convert it to torch.Tensor
return [albu.Normalize(), ToTensor()]
def compose(transforms_to_compose):
# combine all augmentations into single pipeline
result = albu.Compose([
item for sublist in transforms_to_compose for item in sublist
])
return result
Объявим тренировочные, валидационные и тестовые трансформации для дальнейшего переиспользования.
train_transforms = compose([
resize_transforms(),
hard_transforms(),
post_transforms()
])
valid_transforms = compose([pre_transforms(), post_transforms()])
show_transforms = compose([resize_transforms(), hard_transforms()])
Посмотрим как выглядит изображение после аугментации.
show_random(ALL_IMAGES, ALL_MASKS, transforms=show_transforms)
Загрузчик данных
На вход в модель данные должны подаваться в определенном формате. Поэтому для начала создадим класс SegmentationDataset, который будет приводить данные в нужный формат.
class SegmentationDataset(Dataset):
def __init__(
self,
images: List[Path],
masks: List[Path] = None,
transforms=None
) -> None:
self.images = images
self.masks = masks
self.transforms = transforms
def __len__(self) -> int:
return len(self.images)
def __getitem__(self, idx: int) -> dict:
image_path = self.images[idx]
image = utils.imread(image_path)
result = {"image": image}
if self.masks is not None:
mask = gif_imread(self.masks[idx])
result["mask"] = mask
if self.transforms is not None:
result = self.transforms(**result)
result["filename"] = image_path.name
return result
После напишем функцию, которая создает и возвращает загрузчики данных (dataloader).
def get_loaders(
images: List[Path],
masks: List[Path],
random_state: int,
valid_size: float = 0.5,
batch_size: int = 32,
num_workers: int = 4,
train_transforms_fn = None,
valid_transforms_fn = None,
) -> dict:
indices = np.arange(len(images))
# Let's divide the data set into train and valid parts.
train_indices, valid_indices = train_test_split(
indices, test_size=valid_size, random_state=random_state, shuffle=True
)
np_images = np.array(images)
np_masks = np.array(masks)
# Creates our train dataset
train_dataset = SegmentationDataset(
images = np_images[train_indices].tolist(),
masks = np_masks[train_indices].tolist(),
transforms = train_transforms_fn
)
# Creates our valid dataset
valid_dataset = SegmentationDataset(
images = np_images[valid_indices].tolist(),
masks = np_masks[valid_indices].tolist(),
transforms = valid_transforms_fn
)
# Catalyst uses normal torch.data.DataLoader
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
drop_last=True,
)
valid_loader = DataLoader(
valid_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
drop_last=True,
)
# And excpect to get an OrderedDict of loaders
loaders = collections.OrderedDict()
loaders["train"] = train_loader
loaders["valid"] = valid_loader
return loaders
Объявим batch_size и получим загрузчики данных при помощи созданной ранее функции.
batch_size = 16
print(f"batch_size: {batch_size}")
loaders = get_loaders(
images=ALL_IMAGES,
masks=ALL_MASKS,
random_state=SEED,
train_transforms_fn=train_transforms,
valid_transforms_fn=valid_transforms,
batch_size=batch_size
)
Создание и обучение модели
Перейдем к этапу создания модели. Как я уже упоминал на этапе импорта библиотек в модель мы берем из segmentation_models_pytorch. Посмотреть все доступные модели, функции потерь и метрики можно в гитхабе данной библиотеки.
В базовом решении используется модель архитектуры FPN - Feature Pyramid Network с resnext50 в backbone.
Также нельзя не заметить, что мы создаем модель с выходным слоем для одного класса. Сделано это исключительно для упрощения базового решения, которое я менять не могу в силу правил соревнования. Но для боевого решения нужно будет изменить количество классов на 3.
# We will use Feature Pyramid Network with pre-trained ResNeXt50 backbone
model = smp.FPN(encoder_name="resnext50_32x4d", classes=1)
Для обучения используется составная функция потерь, состоящая из DiceLoss, IoULoss и BinaryCrossEntropy.
# we have multiple criterions
criterion = {
"dice": DiceLoss(),
"iou": IoULoss(),
"bce": nn.BCEWithLogitsLoss()
}
Объявим остальные параметры необходимы для обучения модели. Оптимизатор - RAdam, функция для уменьшения learning_rate - ReduceLROnPlateau.
learning_rate = 0.001
encoder_learning_rate = 0.0005
# Since we use a pre-trained encoder, we will reduce the learning rate on it.
layerwise_params = {"encoder*": dict(lr=encoder_learning_rate, weight_decay=0.00003)}
# This function removes weight_decay for biases and applies our layerwise_params
model_params = utils.process_model_params(model, layerwise_params=layerwise_params)
# Catalyst has new SOTA optimizers out of box
base_optimizer = RAdam(model_params, lr=learning_rate, weight_decay=0.0003)
optimizer = Lookahead(base_optimizer)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.25, patience=2)
Для проверки работоспособности решения обучим модель всего на 3 эпохах.
num_epochs = 3
logdir = "./logs/segmentation"
device = utils.get_device()
print(f"device: {device}")
#fp16_params = dict(opt_level="O1") # params for FP16
fp16_params = None
print(f"FP16 params: {fp16_params}")
# by default SupervisedRunner uses "features" and "targets",
# in our case we get "image" and "mask" keys in dataset __getitem__
runner = SupervisedRunner(device=device, input_key="image", input_target_key="mask")
Объявим callbacks для рассчета функции потерь. И запустим обучение.
callbacks = [
# Each criterion is calculated separately.
CriterionCallback(
input_key="mask",
prefix="loss_dice",
criterion_key="dice"
),
CriterionCallback(
input_key="mask",
prefix="loss_iou",
criterion_key="iou"
),
CriterionCallback(
input_key="mask",
prefix="loss_bce",
criterion_key="bce"
),
# And only then we aggregate everything into one loss.
MetricAggregationCallback(
prefix="loss",
mode="weighted_sum", # can be "sum", "weighted_sum" or "mean"
# because we want weighted sum, we need to add scale for each loss
metrics={"loss_dice": 1.0, "loss_iou": 1.0, "loss_bce": 0.8},
),
# metrics
DiceCallback(input_key="mask"),
IouCallback(input_key="mask"),
# visualization
DrawMasksCallback(output_key='logits',
input_image_key='image',
input_mask_key='mask',
summary_step=50
)
]
runner.train(
model=model,
criterion=criterion,
optimizer=optimizer,
scheduler=scheduler,
# our dataloaders
loaders=loaders,
# We can specify the callbacks list for the experiment;
callbacks=callbacks,
# path to save logs
logdir=logdir,
num_epochs=num_epochs,
# save our best checkpoint by IoU metric
main_metric="iou",
# IoU needs to be maximized.
minimize_metric=False,
# for FP16. It uses the variable from the very first cell
fp16=fp16_params,
# prints train logs
verbose=True,
)
Проверка модели
После того, как мы обучили модель, остается только создать предсказания на тестовых данных. Для этого создадим загрузчик данных для тестовых файлов и прогоним его через модель.
TEST_IMAGES = sorted(test_image_path.glob("*.png"))
# create test dataset
test_dataset = SegmentationDataset(
TEST_IMAGES,
transforms=valid_transforms
)
num_workers: int = 4
infer_loader = DataLoader(
test_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers
)
# this get predictions for the whole loader
predictions = np.vstack(list(map(
lambda x: x["logits"].cpu().numpy(),
runner.predict_loader(loader=infer_loader, resume=f"logs/segmentation/checkpoints/best.pth")
)))
Визуализируем полученные результаты. Напомню, что сейчас мы предсказываем только один класс. К тому же, я закомментировал строку с преобразованием маски из вероятностей в целые числа, чтобы результат был более наглядным.
threshold = 0.5
max_count = 1
for i, (features, logits) in enumerate(zip(test_dataset, predictions)):
image = utils.tensor_to_ndimage(features["image"])
mask_ = torch.from_numpy(logits[0]).sigmoid()
#mask = utils.detach(mask_ > threshold).astype("float")
show_examples(name="", image=image, mask=mask_)
if i >= max_count:
break
Рекомендации по улучшению решения
Начать, конечно же, стоит с изменения количества классов с 1 на 3. Это даст возможность получить валидные предсказания и засылать их на платформу.
Следующим шагом я бы посоветовал увеличить количество эпох и дообучить модель, так как 3 эпох явно недостаточно.
Попробовать другие архитектуры моделей, например популярный в соревнованиях по анализу данных Unet.
Создать ансамбль моделей и попробовать применить метод TTA.
Изучить используемые аугментации и, возможно, как-то их улучшить.
Поиграться с размером подаваемых в модель изображений.
Итоги
Данный кейс является классической задачей на сегментацию. Соответственно и методы решения можно брать из других соревнований с задачами сегментации, шансы, что это поможет улучшить результат крайне высок.
В дополнение к данной серии статей на канале Зайцем по ХаХатонам будет проведена трансляция с разбором решений и возможностью задать вопросы.
Всем удачи на чемпионатах и хакатонах!