Привет, Хабр!
Сегодня с вами участники профессионального сообщества NTA Промкин Михаил, Мымрин Дмитрий и Господарикова Ирина.
Одной из областей применения ИИ сегодня является автоматизация контроля за сотрудниками. В данном посте мы рассмотрим приложение технологий ML к задаче детектирования спящих людей (в частности, охранников на рабочем месте) по видеозаписям камер наблюдения.
Обсудим технические аспекты этого процесса, а также потенциальные преимущества и перспективы, которые предоставляет применение искусственного интеллекта в обеспечении безопасности на рабочем месте.
Навигация по посту
О данных
О модели
Подход к определению спящих людей
Реализация на Python
Заключение
О данных
В качестве источника данных в задаче определения спящих на рабочем месте людей напрашиваются камеры видеонаблюдения. Обычно работодатель может установить камеры в рабочих помещениях, а в случае с персоналом службы безопасности, часто их рабочие места уже находятся где‑то рядом с камерами.
Так и в нашем случае, данными для решения задачи были видеофайлы в форматах *.mp4, *.avi с рабочих мест сотрудников охраны. Наш массив данных включал в себя видеофайлы различного качества и разрешения. Общий объем материалов составил около 490 ГБ, а общее время около 590 часов. Предобработка видеоряда не производилась.
Посмотрим на данные. У нас есть видеозаписи, и на некоторых из них есть откровенно спящие люди (1). Постановка задачи в том, чтобы подсветить эти места из всех гигабайтов видео для условного «надзирателя». Записи со спящими людьми мы обозначим как объекты первого класса. Второй класс будет включать все остальные ситуации, то есть:
записи, на которых нет людей (2);
записи, на которых люди бодрствуют и активно, например, ходят (3);
записи, на которых люди в расслабленной позе, но при этом не спят (4).
Интуитивно, хочется предложить простое решение: надо просто отслеживать движение на видеозаписи, и если его нет, то помечать видеозапись как подозрительную. Но это решение, если его реализовать по‑простому, будет иметь явные минусы:
низкая точность из‑за перекрытия случаев (1) и (2);
низкая полнота из‑за возможности присутствия на видео других движущихся объектов (вентиляторов или других людей, например).
Очевидно, что для повышения качества решения нам нужно как‑то получить возможность детектирования каждого отдельного человека на изображении, и применить наше простое решение с использованием этой информации.
О модели
Ранее у нас был опыт применения модели YOLOv7 (You Only Look Once). Она показывала неплохие результаты в наших задачах, да и в сети о ней отзывались хорошо (1, 2).
YOLOv7 обладает способностью проводить детектирование объектов на видеозаписи в реальном времени. Данная модель часто используется в CV проектах, например, в робототехнике и мониторинге безопасности. Модель активно развивается и поддерживается сообществом. Она также легко настраивается по сравнению с её предыдущими аналогами, для этого достаточно передать веса модели в фреймворк.
Данная модель имеет функциональность по определению положения человеческого тела. И нам показалось интересным применить её в задаче детектирования спящих по видеозаписям.
В качестве фреймворка для глубокого обучения была использована библиотека pyTorch. Библиотека обладает достаточно простым интерфейсом и показывает хорошие результаты при работе с графическими процессорами. Простота и эффективность pyTorch позволила нам достаточно быстро приступить к разработке решения.
Подход к определению спящих людей
Для того чтобы определить спящего человека на видео мы предлагаем следующий подход:
Шаг первый: определение точек «скелетов» людей для видеофайла с помощью модели YOLOv7.
Шаг второй: разбиение видеофайла на кадры.
Шаг третий: получение точек «скелетов» по кадрам файла.
Шаг четвёртый: анализ изменения положения людей в видеофайле с целью выявления неподвижности.
Шаг пятый: формирование отчёта.
Таким образом, нам даже не нужно ничего обучать или размечать: веса модели для разметки «скелетов» уже существуют и готовы к использованию. Мы сможем применить высокую точность модели в своей задаче. Конечно, потенциально можно дообучить модель на записях конкретных камер, с определённой спецификой расположения мебели, но и готовая модель годится.
На выходе мы получаем датафрейм в котором присутствуют поминутные метки состояния объекта проверки (флаг сна и флаг отсутствия). Такой файл мы можем передать эксперту, который проверит только подозрительные фрагменты потока видеозаписей, что снизит объём человеческой работы.
Реализация на Python
Пяти шагам подхода у нас соответствуют пять фрагментов кода:
Применение YOLOv7 для получения «скелетов» (pose_detection.py).
Разбиение видеофайла на кадры (video_framer.py).
Получение «скелетов» по кадрам файла в виде таблицы (util_tools.py).
Выявление неподвижности (human_poser.py).
Создание отчетов о состоянии видеофайлов (full_detect.py).
Шаг первый
В скрипте pose_detection.py используются три функции:
plot_pose_prediction — добавляет предсказанные координаты и графические элементы на изображение.
def plot_pose_prediction(img : cv2.Mat, pred : list, thickness=2, show_bbox : bool=True) -> cv2.Mat:
bbox = xywh2xyxy(pred[:, 2:6])
for idx in range(pred.shape[0]):
plot_skeleton_kpts(img, pred[idx, 7:].T, 3)
if show_bbox:
plot_one_box(bbox[idx], img, line_thickness=thickness)
make_pose_prediction — обрабатывает изображение с помощью модели нейронной сети и возвращает список предсказанных координат.
def make_pose_prediction(model, device: str, img : cv2.Mat) -> List:
img_ = letterbox(img, 960, stride=64, auto=True)[0]
resized_shape = img_.shape[0:2]
img_ = transforms.ToTensor()(img_)
img_ = torch.tensor(np.array([img_.numpy()]))
img_ = img_.to(device).float()
with torch.no_grad():
output, _ = model(img_)
conf_threshold = 0.25
tmp = non_max_suppression_kpt(output, conf_thres=conf_threshold,
nc=model.yaml['nc'],
nkpt=model.yaml['nkpt'],
kpt_label=True)
while len(tmp[0]) == 0:
conf_threshold -= 0.001
tmp = non_max_suppression_kpt(output, conf_thres= conf_threshold,
nc=model.yaml['nc'],
nkpt=model.yaml['nkpt'],
kpt_label=True)
output = output_to_keypoint(tmp)
output = scale_pose_output(output, resized_shape, img.shape[0:2])
return output
video_pose — обрабатывает видео файл, извлекает его свойства, создает объект VideoWriter для записи видео и в цикле обрабатывает каждый кадр с помощью модели, выводя результаты на изображение.
def video_pose(filename: str, out_filename: str, model, device) -> List:
clip = VideoFileClip(filename)
duration = clip.duration
fps = clip.fps
cap = cv2.VideoCapture(filename)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'MJPG')
out = cv2.VideoWriter(out_filename, fourcc, fps, (width, height))
predictions = []
with tqdm(total=int(round(fps*duration))) as bar:
while cap.isOpened():
ret, frame = cap.read()
if ret:
pred = make_pose_prediction(model, device, frame)
predictions.append(pred[0])
plot_pose_prediction(frame, pred, show_bbox=False)
out.write(frame)
cv2.waitKey(100)
else:
break
bar.update(1)
cap.release()
out.release()
clip.close()
cv2.destroyAllWindows()
return predictions
Шаг второй
Код в файле video_framer.py отвечает за конвертацию видеофайла в формат изображений JPEG и извлечение кадров из видеофайла. Функция fetch_frames_to_jpg конвертирует каждый кадр видеофайла в формат изображений JPEG.
def fetch_frames_to_jpg(path: str) -> List:
op = "video_framer.fetch_frames_to_jpg"
try:
fps, duration = __video_info(path)
except ValueError as err:
raise ValueError(f"{op}: {err}")
frames = []
cap = cv.VideoCapture(path)
try:
with tqdm(total=int(round(fps*duration))) as bar:
while cap.isOpened():
success, frame = cap.read()
if success:
_, frame = cv.imencode('.jpeg', frame)
frames.append(frame)
else:
break
bar.update(1)
except Exception as err:
raise ValueError(f"{op}: {err}")
finally:
cap.release()
return frames
Функция fetch_frames_to_poses использует модель машинного обучения для обнаружения позы человека на каждом кадре (код объёмный, поэтому скрыли его под спойлером):
Развернуть код
def fetch_frames_to_poses(model, device, path: str, fps_user=None, frames_skip=None) -> List:
op = "video_framer.fetch_frames_to_poses"
try:
fps, duration = __video_info(path)
except ValueError as err:
raise ValueError(f"{op}: {err}")
if fps_user:
fps = fps_user
poses = []
cap = cv.VideoCapture(path)
try:
with tqdm(total=int(round(fps*duration))) as bar:
frames_counter = frames_skip
while cap.isOpened():
success, frame = cap.read()
if success:
# Skip frames
if frames_skip:
if frames_counter < frames_skip:
frames_counter += 1
bar.update(1)
continue
else:
frames_counter = 0
pose = make_pose_prediction(model, device, frame)
poses.append(pose[0])
else:
break
bar.update(1)
except Exception as err:
raise ValueError(f"{op}: {err}")
finally:
cap.release()
return poses
def __video_info(path: str) -> Tuple[int, float]:
op = "video_framer.__video_info"
try:
clip = VideoFileClip(path)
fps = clip.fps
duration = clip.duration
except Exception as err:
raise ValueError(f"{op}: {err}")
finally:
clip.close()
return fps, duration
Шаг третий
Скрипт util_tools.py отвечает за получение кадров из видеофайла и их конвертацию в формат изображений JPEG. Функция get_fps принимает путь к видеофайлу в качестве аргумента и возвращает количество кадров в видеофайле, которое можно использовать для расчета времени, затраченного на обработку видеофайла. Объёмный код под спойлером.
Развернуть код
def get_fps(video_path: str, seconds_cut: int=60) -> float:
assert os.path.exists(video_path), f'Video on path {video_path} not found'
temp_video_path = './temp' + os.path.splitext(video_path)[-1]
subprocess.run(['ffmpeg', '-i', video_path, '-to', str(seconds_cut),
'-c:v', 'copy', temp_video_path, '-loglevel', 'quiet'])
cap = cv2.VideoCapture(temp_video_path)
frames_count = 0
while cap.isOpened():
success, frame = cap.read()
if success:
frames_count += 1
else:
break
os.remove(temp_video_path)
return round(frames_count/seconds_cut, 2)
def report_by_minutes(arr_is_markup: List[bool],
fps: float, markup_border: float,
frames_nums: List[int] = None,
markup_name: str = None) -> pd.DataFrame:
# Кол-во кадров в минуте для данного видео
frames_in_minute = fps * 60
# Вычисление списка минут
if frames_nums is not None:
frames_count = frames_nums[-1]
else:
frames_count = len(arr_is_markup)
minutes = list(range(ceil(frames_count / frames_in_minute)))
# Список разметок для каждой минуты
minutes_markup = [False] * len(minutes)
# Цикл по минутам
for minute in minutes:
# Границы подсписка кадров, входящие в минуту
left_border = int(frames_in_minute * minute)
right_border = min(int(left_border + frames_in_minute), frames_count)
if frames_nums is not None:
# Номер первого кадра, больше левой границы
left_border = np.where(frames_nums >= left_border)[0][0]
# Номер последнего кадра, меньше правой границы
right_border = np.where(frames_nums <= right_border)[0][-1]
# Кол-во кадров в подсписке
frames_count_diff = right_border - left_border
# Разметка минуты
minutes_markup[minute] = (
sum(arr_is_markup[left_border:right_border]) / frames_count_diff
> markup_border
)
if not markup_name:
markup_name = 'Отметка'
df_report = pd.DataFrame(np.column_stack((
minutes,
minutes_markup
)), columns=['Минута', markup_name])
df_report[markup_name] = df_report[markup_name].astype('bool')
return df_report
Шаг четвёртый
После подготовки и разметки нашего датасета с видеофайлами мы перешли непосредственно к анализу результатов работы модели (скрипт human_poser.py).
Загрузим модель YOLOv7 с поддержкой тензорных типов CUDA, которые реализуют те же функции, что и тензоры CPU (процессорные ядра), но с использованием GPU (графические ядра) для вычислений. Затем отключим некоторые определенные слои или части модели, которые ведут себя по‑разному во время обучения и вывода данных («model.eval()»).
Код представлен ниже:
# Установили количество ядер графического процессора, если это возможно
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('Device', device)
# Загрузка модели
model = attempt_load('weights/yolov7-w6-pose.pt')
# Переключиться в режим оценки, map_location=device
model.eval()
print('Number of classes:', model.yaml['nc'])
print('Number of keypoints:', model.yaml['nkpt']))
Для определения того что человек на видео действительно спит, мы отбросили ключевые точки ног и тела. Это связано с тем что позиция ног во время сна может меняться (например, нога может быть согнута в колене). Данный факт может запутать результаты. Поэтому мы ориентировались на подвижность плечевого пояса человека на видео.
В приведенном ниже коде происходит анализ видеофайлов и создание отчетов о состоянии видеофайлов. Он использует модули cv2 и pandas для обработки изображений и создания таблиц соответственно.
Развернуть код
def process_files(process_paths: Iterable[str]):
for poses_path in tqdm(poses_paths):
fps, video_name = validate_path(poses_path)
if not fps:
continue
report = process_file(poses_path, get_params(fps), fps)
reports_folder = 'reports'
os.makedirs(reports_folder, exist_ok=True)
report.to_excel(f'{reports_folder}/report_{video_name}.xlsx', index=False)
def validate_path(poses_path: str) -> Tuple([str], str):
if not os.path.exists(poses_path):
print(f'CSV file with poses not find: {poses_path}')
return None
video_name = os.path.basename(poses_path).split('_poses_skip_')[0]
if video_name not in dict_video_fps:
print(f'FPS undefined for: {poses_path}')
return None
return dict_video_fps[video_name], video_name
def process_file(poses_path: str, sleep_detect_parametrs: list, fps: float):
df_poses = pd.read_csv(poses_path)
columns = list(df_poses.columns[8:])
poses = np.array(df_poses[columns].values.tolist())
predictions = np.array([True]*len(poses))
for ind, params in enumerate(sleep_detect_parametrs):
pairs = check_sleeping(poses, **params)
pairs_predictions = np.array([False]*len(predictions))
for pair in pairs:
pairs_predictions[pair[0]:pair[1]] = True
predictions &= pairs_predictions
poses_filter = get_poses_filter(df_poses)
pred = predictions & poses_filter
return render_report(df_poses, poses_filter, pred, fps)
def get_params(fps: float) -> list:
return [{
'body_part_probability': 0.2,
'radius': 40,
# 'crit_time': 5,
# 'frames_movement_count': 5,
# 'crit_time': int(fps),
'frames_movement_count': int(fps),
'body_parts': ['нос', 'прав глаз', 'лев глаз', 'лев ухо', 'прав ухо']
}, {
'body_part_probability': 0.6,
'radius': 20,
'crit_time': int(fps * 0.5),
'frames_movement_count': int(fps * 0.5),
'body_parts': ['лев запястье', 'прав запястье'],
}]
Шаг пятый
И наконец, код из скрипта full_detect.py создает отчеты о состоянии видеофайлов на основе отсутствия движения, сна и других факторов.
Развернуть код
def render_report(df_poses: pd.DataFrame, poses_filter, pred: set, fps: float) -> pd.DataFrame:
df_nogo_filter = report_by_minutes(
poses_filter,
fps,
markup_border=0.1,
frames_nums=df_poses['#'].to_numpy(),
markup_name='Фильтр ного',
)
df_nogo_filter['Фильтр ного'] = df_nogo_filter['Фильтр ного'].apply(lambda x: 'x' if x else '')
df_sleep_report = report_by_minutes(
pred,
fps,
markup_border=0.1,
frames_nums=df_poses['#'].to_numpy(),
markup_name='Спит',
)
df_sleep_report['Спит'] = df_sleep_report['Спит'].apply(lambda x: 'x' if x else '')
df_absent_report = report_by_minutes(
df_poses['box_p'] < 0.4,
fps,
markup_border=0.6,
frames_nums=df_poses['#'].to_numpy(),
markup_name='Отсутствует'
)
df_absent_report['Отсутствует'] = df_absent_report['Отсутствует'].apply(lambda x: 'x' if x else '')
df_report = df_sleep_report.merge(df_absent_report, on='Минута')
df_report = df_nogo_filter.merge(df_report, on='Минута')
df_report['Время'] = df_report['Минута'].apply(lambda x: f'{x//60:02d}:{x%60:02d}:__')
df_report.drop('Минута', axis=1, inplace=True)
df_report = df_report[['Время'] + df_report.columns[:-1].tolist()]
return df_report
Таким образом наше решение может определить спящих людей на видео и предоставить экспертам данные в виде таймкодов и названий файлов. Эксперту необходимо самостоятельно определить права ли модель.
Пример результата работы программы:
Минута | Фильтр ног | Спит | Отсутствует |
0 | x | x | |
1 | x | x | |
2 | x | x | |
3 | x | ||
4 | x | x | |
5 | x | x | |
6 | x | x |
Заключение
Модель YOLOv7 имеет большой спектр применений, и сегодня мы показали одно из них. Этот подход позволяет перенести качество весов конкретной модели на прикладную задачу, избегая затрат на дообучение и разметку данных.
Получившийся алгоритм можно интегрировать в системы мониторинга исполнения сотрудниками службы безопасности своих обязанностей, что в целом должно способствовать повышению уровня безопасности для клиентов и сотрудников.
Если у вас есть мысли о применении более эффективного похода для решения задач компьютерного зрения в рамках обеспечения порядка исполнения обязанностей на рабочем месте, делитесь им в комментариях.