Task-Сервер на «Flask»: как заставить несколько компьютеров решать одну задачу

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Привет, Хабр!

Подготовительным этапом для видеоаналитики с применением методов машинного обучения - является выгрузка записей из видеорегистраторов Hikvision, что является достаточно длительной задачей, особенно если регистраторов несколько, давайте попытаемся разобраться как выполнить эту работу быстрее и удобнее, используя сервер.

Основанием для разработки «Task-Сервера» -  послужила задача по выгрузке видеозаписей с видеорегистраторов Hikvision. В работе была использована библиотека Hikload которая подключалась к регистраторам и загружала видеоролики.

 Для начала немного о «Task-Сервере» (Рис. 1), в данном случае это небольшой сервер, хранящий в базе данных или файле заранее подготовленные строки, которые будут переданы в качестве параметров клиентам, подключенным к хосту. Клиент же в свою очередь запускает скрипт с полученными данными и после выполнения запрашивает новую задачу до тех пор, пока задачи не закончатся.

Рис. 1 – схема Task-Сервера
Рис. 1 – схема Task-Сервера

В качестве инструмента для создания такого сервера был выбран микрофреймворк Flask, нам достаточно небольшого веб-приложения для нашей задачи. Из плюсов Flask можно выделить простоту создания веб-приложений, отсутствие сложных настроек и достаточно стабильная и быстрая работа.  В качестве хранилища для команд была выбрана таблица Excel. Все задачи были сформированы заранее.

Проблема

Так как количество камер в разных помещениях, разных городов достаточно много, а время выгрузки с одного устройства сильно варьировалось, от нескольких минут до часа (Рис. 2), то было принято решение: на основе параметров для подключения к регистраторам сформировать задачи и распределить их на разных машинах, для чего и потребовался сервер.

Рис. 2 Процесс выгрузки видео
Рис. 2 Процесс выгрузки видео

Решение:

Начнем работу с создания сервера, для начала создаем новое окружение под приложение следующей командой в терминале:

python –m venv venv

Так как решение было использовать Flask то подключаем следующие модули:

1.     «Flask» -  для создания веб-приложения

2.     «openpyxl» – непосредственно для работы с excel файлом, где находятся команды.

3.     «multiprocessing» - из этого модуля нам потребуются только очереди, так как они идеально подходят для нашей задачи 

Для начала импортируем библиотеку Flask и инициализируем приложение:

from flask import Flask
app = Flask(__name__)

Импортируем остальные библиотеки:

from openpyxl import load_workbook
from multiprocessing import Queue

Создаем и наполняем базу данных задачами, которые сервер будет возвращать клиенту. Задачи сформированы заранее, они состоят из сервера/ ip – адреса к которому подключаемся, user - логина, password - пароля, cameras - номера камеры. По умолчанию если не задавать дату и время - то Hikload выгружает видео за один текущий день. Полный список параметров для выгрузки видео можно узнать, набрав «hikload --help» (рис. 3)

Рис. 3 Все параметры для выгрузки с Hikload
Рис. 3 Все параметры для выгрузки с Hikload

Далее добавим функцию, которая возвращает команду из базы данных. Все задачи из таблицы Excel выгружаются в Queue(очередь).

data = load_workbook('test.xlsx', 'wb') 
sheet = data.active
que = Queue()
result = []

#выгрузка задач из таблицы Excel и запись в очередь
for row in sheet:
    for cell in row:
        result.append(cell.value)
    que.put(result)
    result = []

#выдача команд из очереди    
def get_task():
    return que.get()

#отправка команд клиентам.

Декоратор «app.route» регистрирует URL-адрес, по которому будет доступно представление, описанное ниже и вызывается экземпляром приложения app(Flask)  

@app.route('/')
@app.route('/index')
def index():
	return ' '.join(map(str, get_task()))

Весь код сервера представлен ниже:

from flask import Flask

app = Flask(__name__)

from openpyxl import load_workbook
from multiprocessing import Queue

data = load_workbook('test.xlsx', 'wb') 
sheet = data.active
que = Queue()
result = []

#выгрузка задач из таблицы Excel и запись в очередь
for row in sheet:
    for cell in row:
        result.append(cell.value)
    que.put(result)
    result = []

#выдача команд из очереди    
def get_task():
    return que.get()


#отправка команд клиентам    
@app.route('/')
@app.route('/index')
def index():
    if not que.empty():
        return ' '.join(map(str, get_task()))
    else:
        return

if __name__ == '__main__':
    app.run(debug=False)

Сервер готов, очередь заполнена задачами, следующим пунктом будет разработка клиента для правильной интерпретации принятых команд и запуска соответствующих скриптов. Рецепт программы-клиента состоит из следующих ингредиентов:

1.                «Requests» для обращения к серверу и получения задачи.

def get_task():
    for _ in range(2):
        resp = rq.get('localhost')
        if resp != “the end”:
            tasks.append(list((resp.content.decode('utf-8').split(' '))))
        else:
            return None
    return (tasks)
  1. «Multiprocessing» для выполнения 2-х задач (можно и другое количество, зависящее от характеристик вашего устройства) параллельно.

if __name__ == '__main__':
    while True:
	   tasks = [] #создаем пустой список для задач
        response = get_task()
        NUM_CORE = 2 #объявляем количество ядер
        read = mp.Queue()
        [read.put(x) for x in response] #записываем очередь командами
        print("read qsize", read.qsize()) 
        for i in range(NUM_CORE):   
            mp.Process(target=load, args=(read,)).start() #запускаем скрипт
		 mp.join()
        while True:
            time.sleep(1)
  1. Модуль «os» непосредственно, для запуска Hikload.

def load(read):
    while True:
        if not read.empty(): #проверяем заполненность очереди
            x = read.get() 
            name_proc = mp.current_process().name
            print(f"{name_proc} stared") #выводим работающие процессы
            task = (f'hikload --server={x[2]} ' 
                      f'--user={x[0]} '
                      f'--password={x[1]} '
                      f'--cameras={x[3]} '
                      f'--downloads {x[2]+ "_" + name_proc}'
                    )
            print(task)
            '''Запуск скрипта с нашими командами'''
            os.system(task)
        else: 
            break

Код клиента целиком выглядит следующим образом:

import multiprocessing as mp, os, time, requests as rq

#получаем задачи с сервера
def get_task():
    for _ in range(2):
        resp = rq.get('http://127.0.0.1:5000/')
        if resp != 'the end':
            tasks.append(list((resp.content.decode('utf-8').split(' '))))
        else:
            return None
    return (tasks)

#запуск выгрузки видеозаписей      
def load(read):
    while True:
        if not read.empty(): #проверяем заполненность очереди
            x = read.get() 
            name_proc = mp.current_process().name
            print(f"{name_proc} stared") #выводим работающие процессы
            task = (f'hikload --server={x[2]} ' 
                      f'--user={x[0]} '
                      f'--password={x[1]} '
                      f'--cameras={x[3] '
                      f'--downloads {x[2]+ "_" + name_proc}'
                    )
            print(task)
            '''Запуск скрипта с нашими командами'''
            os.system(task)
        else: 
            break

if __name__ == '__main__':
    while True:
        tasks = []
        response = get_task()
        NUM_CORE = 2 #объявляем количество ядер
        read = mp.Queue()
        get_task() #вызываем функцию загрузки задач
        [read.put(x) for x in tasks] #записываем очередь командами
        print("read qsize", read.qsize()) 
        for i in range(NUM_CORE):   
            mp.Process(target=load, args=(read,)).start() #запускаем скрипт
            mp.join()
        while True:
            time.sleep(1)

Для начала запускаем сервер (Рис. 5) на локальном хосте, следующей командой: flask run

Рис. 5 Запуск сервера на локальном хосте
Рис. 5 Запуск сервера на локальном хосте

Далее запускаем клиент, команда: python main.py:

Рис. 6 Запуск клиента и начало загрузки видеозаписей
Рис. 6 Запуск клиента и начало загрузки видеозаписей

Мы видим, что очередь автоматически заполнилась 2-мя задачами (Рис. 6) и клиент начал выгрузку, в это время сервер показывает, что передал 2 команды, код 200 в ответе говорит об успешной обработке запроса (Рис. 7)

Рис. 7 Ответ сервера на запрос
Рис. 7 Ответ сервера на запрос

Под каждую команду создаются отдельные папки, где хранятся выгруженные видеозаписи, названия формируются параметром --download (рис.8). После отработки/выгрузки команды – клиент снова посылает запрос на сервер и получает новую команду.

Рис. 8 Папки с выгруженными данными
Рис. 8 Папки с выгруженными данными

И, конечно, вывод

В результате мы реализовали автоматизированное выполнение задачи, разделенной на множество подзадач. На каждом клиенте выполняются 2 выгрузки параллельно (рис.9)

Рис. 9 Выполнение задач в командной строке
Рис. 9 Выполнение задач в командной строке

Из явных преимуществ такого подхода можно выделить:

1.     Загрузка видеозаписей проходит быстрее с каждым подключенным клиентом

2.     Все задачи распределяются автоматически

3.     Сервер прост в разработке, многофункционален и его можно использовать для любых, похожих на показанной в примере задач

Источник: https://habr.com/ru/post/687032/


Интересные статьи

Интересные статьи

Привет, мы команда СберМегаМаркета, и это обзорная статья о нашей площадке, пробный камень для блога Хабре. За нашими плечами спешный переезд с PHP на GO, ребрендинг и решение таких задач, с которыми ...
В статье пойдет речь о том как вывести аудио, проигрываемое на raspberry pi на несколько источников (проигрывателей) одновременно. В частности, аудио будет параллельно транслироваться по hdmi, на au...
Несколько несложных примеров того, как на практике можно использовать продвинутые возможности Helm для эффективной организации безупречной continuous delivery в Kubernete...
В 1С Битрикс есть специальные сущности под названием “Информационные блоки, сокращенно (инфоблоки)“, я думаю каждый с ними знаком, но не каждый понимает, что это такое и для чего они нужны
Автокэширование в 1с-Битрикс — хорошо развитая и довольно сложная система, позволяющая в разы уменьшить число обращений к базе данных и ускорить выполнение страниц.