Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Привет, Хабр!
Меня зовут Илья Родин, я инженер по анализу данных в «Рексофт». Сейчас я работаю в команде департамента горнодобывающих решений компании, поэтому пишу о насущном: как усовершенствовать контроль работы циклов погрузчика, забирая данные из штатной диагностической системы двигателя. Эту статью я писал вместе со своим коллегой @VLebedev_22.
Итак, поехали!
Постановка задачи
На горнодобывающих производствах, в строительстве и в других сферах промышленности широко используется техника для погрузки и перемещения горной массы или сыпучих материалов, например, фронтальные погрузчики и экскаваторы. Одна из важных задач управления таким производством – отслеживать время работы машины, длительность простоев, производительность, желательно с привязкой к конкретному водителю/оператору. Это помогает осуществлять контроль операционной эффективности.
При традиционном подходе к автоматизации контроля, требуется устанавливать дополнительные датчики, которые отслеживали бы положение рабочих органов машины, а также измеряли давление в гидравлических контурах или силу тока на электроприводах (если это канатный экскаватор). На основе этих сигналов выстраивается алгоритм, который в состоянии считать рабочие операции, и который может отделить «холостые» циклы от циклов с полезной нагрузкой (в нашем опыте встречались предприимчивые операторы, которые разгадывали логику простых алгоритмов и могли по-быстрому заработать себе дополнительных очков производительности, совершая манипуляции пустым ковшом).
Надо ли говорить, что множество дополнительных датчиков и проводов является слабой стороной такой системы? Распространённый сценарий: предприятие внедряет систему диспетчеризации и управления парком техники, система вводится в эксплуатацию, а через год уже 30% датчиков не работает, поддерживать работоспособность системы оказывается трудоёмко, операционный контроль снижается.
В другой статье как раз @VLebedev_22 уже рассказывал о том, как можно решать эту задачу с помощью машинного зрения. Но наша мысль не стоит на месте. Мы предположили, что можно попробовать обойтись ещё меньшим количеством оборудования, забирая данные из штатной диагностической системы двигателя. Об этом и поведем рассказ.
Современные машины обладают достаточно богатым набором диагностической телеметрии двигателя, и от одной машины к другой протокол CAN и набор параметров в целом не меняются (везде есть стандартные «обороты», «крутящий момент» и т.п.). Значит, можно попробовать поискать паттерны поведения работы двигателя в зависимости от выполняемой работы. Кажется, очевидно, что работа на холостом ходу будет явно отличаться от работы под нагрузкой. А получится ли считать рабочие циклы (рейсы)? Отделять погрузку от разгрузки?
Конкретизируем задачу. В течение рабочей смены фронтальный погрузчик совершает некоторое количество рабочих рейсов. Рейс – это последовательность действий: загрузка горной породы в ковш, ее транспортировка и выгрузка.
Задача состоит в том, чтобы по временным рядам показателей различных штатных датчиков, автоматически подсчитать количество совершенных рейсов. Дополнительное пожелание: постараемся обойтись как можно меньшим количеством датчиков, а также использовать наиболее простые и стандартные из них, чтобы унифицировать алгоритм для других моделей фронтальных погрузчиков.
Еще одно требование заключается в том, что подсчет рейсов должен вестись в режиме реального времени.
Данные
У нас есть логи с CAN-шины, включающие в себя довольно стандартные показатели:
sensors = [
'Текущий момент двигателя',
'Обороты двигателя',
'Положение педали газа',
'Максимально доступный момент на двигателе',
'Номинальное трение — крутящий момент в процентах',
'Напряжение на выключателе АКБ',
'Температура охлаждающей жидкости',
'Давление на входе в коллектор двигателя 1',
'Давление моторного масла',
'Температура выхлопных газов двигателя — правый коллектор'
]
Также есть разметка этих логов: для каждой временной точки мы знаем соответствующее ей состояние погрузчика. Возможных состояний у нас 4: пустой, погрузка, загруженный, разгрузка:
from enum import Enum
class LoaderState(Enum):
EMPTY = 0
LOADING = 2
LOADED = 1
UNLOADING = 3
Вот график с участком временного ряда одного из наших показателей (обороты двигателя) с наложенными состояниями погрузчика:
Состояния на этом графике окрашены зеленым, фиолетовым, серым и желтым цветами (см. легенду). Видно, что график показателя (оборотов двигателя в данном случае) меняется с изменением состояний погрузчика, особенно хорошо это заметно по «всплескам» при загрузке (состояние LOADING фиолетового цвета). А вот, например, при разгрузке (UNLOADING желтого цвета) всплески не так заметны (спойлер: и в дальнейшем нам придется хитро извернуться, чтобы правильно определять это состояние).
Таким образом, мы имеем задачу supervised learning со спецификой временных рядов: для новых данных – временных рядов выбранных показателей с датчиков на погрузчике – нам надо определить соответствующие каждой временной точке состояния погрузчика, чтобы затем рассчитать количество совершенных этим погрузчиком рейсов.
Другими словами, это задача сегментации временных рядов.
Feature engineering
Фичи из наших временных рядов будем генерировать автоматически с помощью библиотеки tsfresh:
import tsfresh
from tsfresh import extract_features as extract_tsfresh_features, select_features
from tsfresh.utilities.dataframe_functions import impute
from tsfresh.feature_extraction import settings as tsfresh_settings
settings = tsfresh_settings.MinimalFCParameters()
settings.update(
tsfresh_settings.TimeBasedFCParameters()
)
settings.update(
tsfresh_settings.EfficientFCParameters()
)
def extract_features(df: pd.DataFrame) -> pd.DataFrame:
# Фиктивная колонка для группировки
copy_df = df.copy()
group_col = "tsfresh"
copy_df.loc[:, group_col] = 0
return extract_tsfresh_features(
copy_df,
column_id=group_col,
impute_function=impute,
default_fc_parameters=settings
)
Также tsfresh из коробки умеет отобрать фичи с помощью метода relevant_features. Удобно обернуть отбор фич в отдельный класс, совместимый с sklearn Pipeline:
from sklearn.base import BaseEstimator, TransformerMixin
class FeatureSelector(BaseEstimator, TransformerMixin):
def fit(self, X, y=None, **kwargs):
relevant_features = set()
for label in np.unique(y):
# select_features работает с бинарной классификацией, поэтому переводим задачу
# в бинарную для каждого класса и повторяем по всем классам
y_binary = y == label
X_filtered = tsfresh.select_features(X, y_binary)
relevant_features = relevant_features.union(set(X_filtered.columns))
self.relevant_features_ = relevant_features
return self
def transform(self, X, y=None):
return X[list(self.relevant_features_)]
Общее описание подхода к решению
Наше решение должно работать в режиме реального времени, поэтому тестовые данные мы будем проверять так: двигаемся по временному ряду скользящим окном размера window_size = 5 секунд с шагом тоже window_size (т.е. окна не пересекаются). И для каждого окна смотрим, есть ли внутри него изменение состояний погрузчика. Для удобства мы немного меняем наш таргет: будем предсказывать не состояние(-я) внутри окна, а их изменения.
Введем новый Enum:
class LoaderStateTransition(Enum):
EMPTY_NO_TRANSITION = 0
EMPTY_TO_LOADING = 1
LOADING_NO_TRANSITION = 2
LOADING_TO_LOADED = 3
LOADED_NO_TRANSITION = 4
LOADED_TO_UNLOADING = 5
UNLOADING_NO_TRANSITION = 6
UNLOADING_TO_EMPTY = 7
LOADING_TO_EMPTY = 8 # неудачная погрузка
Имена тут self-explanatory, например, EMPTY_NO_TRANSITION означает, что внутри окна есть только одно состояние – EMPTY, а EMPTY_TO_LOADING – что внутри окна состояние EMPTY в какой-то момент сменилось на LOADING. Код перевода исходных состояний LoaderState в переходы между состояниями LoaderStateTransition опустим, он тривиален.
Самая главная идея: используем несколько моделей машинного обучения. Каждая из этих моделей будет натренирована на смену состояний погрузчика в определенном месте окна. Напоминаю, что мы двигаемся по временному рядом скользящим окном размера window_size = 5 секунд.
Тренируем n_models = windows_size - 2 моделей следующим образом: если текущее окно не содержит изменения состояния погрузчика, скармливаем его для обучения всем моделям, иначе смотрим, в какой точке i окна происходит изменение состояния, и скармливаем фичи окна на обучение i-й модели.
В качестве базовой модели используем градиентный бустинг — XGBClassifier из xgboost.
Напишем удобный класс CompositePipeline, совместимый с sklearn, который будет внутри себя содержать по пайплайну для каждой модели:
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.pipeline import Pipeline
from sklearn.utils.validation import check_X_y, check_array
from dataclasses import dataclass
from enum import Enum
from typing import Optional
@dataclass
class CompositePrediction:
label: int
prob: float
model_ind: Optional[int]
target_class: Optional[type[Enum]]
def as_enum(self) -> Optional[Enum]:
if self.target_class is not None:
return self.target_class(self.label)
return None
def __lt__(self, other):
return self.prob < other.prob
def __str__(self) -> str:
if (enum_obj := self.as_enum()) is not None:
name = enum_obj.name
else:
name = str(self.label)
return f"{name}: p={self.prob:0.4f}; model {self.model_ind}"
class CompositePipeline(BaseEstimator, ClassifierMixin):
def __init__(self, steps_info: list[tuple[str, type, dict]], n_models: int = 1, target_class: Optional[type[Enum]] = None):
self.steps_info = steps_info
self.n_models = n_models
self.target_class = target_class
def fit(self, X, y, **kwargs):
# Check that X and y have correct shape
check_X_y(X, y)
if self.n_models > 1:
model_indices = kwargs['model_indices']
model_indices = model_indices.values if hasattr(model_indices, 'columns') else model_indices
else:
model_indices = np.ones((X.shape[0], 1), dtype=int)
if model_indices.shape[1] != self.n_models:
raise ValueError("Number of columns in model_indices and n_models mismatch!")
if X.shape[0] != model_indices.shape[0]:
raise ValueError("Shapes of data and model_indices mismatch!")
# Create pipelines:
pipes = []
for _ in range(self.n_models):
steps = []
for (name, cls, step_kwargs) in self.steps_info:
estimator = cls(**step_kwargs)
steps.append((name, estimator))
pipe = Pipeline(steps)
pipes.append(pipe)
self.pipes = pipes
# Fit pipelines:
for i, pipe in enumerate(self.pipes):
# Отбираем все строки из X и эл-ты из y, соответствующие этой трубе:
mask_i = model_indices[:, i] == 1
X_i = X[mask_i]
y_i = y[mask_i]
pipe.fit(X_i, y_i)
self.classes_ = self.pipes[0][-1].classes_
self.n_features_in_ = X.shape[1]
return self
def predict(self, X) -> np.ndarray:
# Input validation
check_array(X)
# Собираем все предсказания от всех моделей
n_rows = X.shape[0]
n_classes = len(self.classes_)
all_predictions = np.zeros((self.n_models, n_rows, n_classes), dtype=object)
for model_ind, clf in enumerate(self.pipes):
model_probs = clf.predict_proba(X)
model_predictions = []
for row_probs in model_probs:
model_predictions.append([])
for label, prob in zip(self.classes_, row_probs):
prediction_obj = CompositePrediction(
label=label,
prob=prob,
model_ind=model_ind,
target_class=self.target_class
)
model_predictions[-1].append(prediction_obj)
all_predictions[model_ind] = model_predictions
# Для каждого класса берем максимальное его предсказание среди всех моделей:
predictions_table = np.sort(all_predictions, axis=0)[-1, :, :]
return predictions_table
Его использование выглядит так:
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from xgboost import XGBClassifier
pipe = CompositePipeline(steps_info=[
('tsfresh_feature_selector', FeatureSelector, {}),
('scaler', StandardScaler, {}),
('pca', PCA, {'n_components': 0.95}),
('model', XGBClassifier, {'objective': 'multi:softmax'})
], n_models=n_models, target_class=LoaderStateTransition)
pipe.fit(X_train, y_train, model_indices=model_indices_train)
y_pred = pipe.predict(X_test)
Здесь параметр model_indices в методе fit — это дата-фрейм или двумерный массив нулей и единиц, в котором i-я колонка содержит 1 в тех строках, на которых должна обучаться i-я модель, и 0 - во всех остальных строках (one-hot encoding).
Таким образом, для каждой строки тестовых данных (т.е. для каждого временного окна) мы с помощью CompositePipeline получаем некоторый набор прогнозов — по одному от каждой из наших моделей. Прогноз каждой модели для одной строки данных — это массив вероятностей для каждого возможного лейбла-таргета типа LoaderStateTransition. Оставим для каждого LoaderStateTransition наибольшее его предсказание среди всех опрошенных моделей (т.е. предсказание с наибольшей вероятностью). Таким образом, по мере прохождения n временных окон мы получим таблицу прогнозов из n строк (= число временных окон) и m=9 столбцов (по числу возможных LoaderStateTransition):
| EMPTY_NO_TRANSITION | EMPTY_TO_LOADING | LOADING_NO_TRANSITION | LOADING_TO_LOADED | LOADED_NO_TRANSITION | LOADED_TO_UNLOADING | UNLOADING_NO_TRANSITION | UNLOADING_TO_EMPTY | LOADING_TO_EMPTY | |
13:08:06-13:08:10 | 0.9935 | 0.0002 | 0.0002 | 0.0014 | 0.0246 | 0.0008 | 0.0025 | 0.0040 | 0.0002 | |
13:08:11-13:08:15 | 0.0091 | 0.9945 | 0.0017 | 0.0008 | 0.0052 | 0.0006 | 0.0009 | 0.0003 | 0.0003 | |
13:08:16-13:08:20 | 0.0054 | 0.0002 | 0.9987 | 0.0001 | 0.0019 | 0.0001 | 0.0001 | 0.0001 | 0.0000 | |
... |
|
|
|
|
|
|
|
|
|
Теория графов
Нам осталось понять, как выбрать «наилучшее» предсказание в каждой строке, полученной нами таблицы. На ум сразу приходит жадный алгоритм — а давайте просто для каждого окна (строки таблицы) брать прогноз самой уверенной модели, т.е. прогноз с наибольшим значением вероятности. К примеру, для первых трех строк таблицы выше цепочка прогнозов будет такая: EMPTY_NO_TRANSITION — EMPTY_TO_LOADING — LOADING_NO_TRANSITION.
Но вот беда: подобное решение будет плохим. Некоторые классы в нашей задаче плохо разделимы между собой, и модели будут их постоянно путать. В частности, класс EMPTY_NO_TRANSITION плохо различим с LOADING_TO_EMPTY.
Вот график настоящих (ground truth) состояний погрузчика в тестовых данных:
А вот результат работы жадного алгоритма (после конвертации типа таргета в исходный LoaderSate):
Разница по числу рейсов аж в 2 раза, что, конечно, никуда не годится.
Поэтому мы используем дополнительную информацию о задаче. А именно, используем ограничения по переходам состояний друг в друга. Например, очевидно состояние «Пустой» не может сразу перейти в состояние «Разгружается», минуя состояния «Загружается» и «Загружен».
Код для переходов между, извините за тавтологию, переходами LoaderStateTransition:
def get_possible_transitions(
as_int: bool = True
) -> dict[LoaderStateTransition, list[LoaderStateTransition]] | dict[int, list[int]]:
"""
Возвращает словарь возможных переходов состояний.
Ключ - из какого состояния переходим, значения - в какие состояния можем перейти.
"""
d = {
LoaderStateTransition.EMPTY_NO_TRANSITION: [
LoaderStateTransition.EMPTY_NO_TRANSITION,
LoaderStateTransition.EMPTY_TO_LOADING,
LoaderStateTransition.LOADING_NO_TRANSITION, # когда смена состояний происходит на границе окна
],
LoaderStateTransition.EMPTY_TO_LOADING: [
LoaderStateTransition.LOADING_NO_TRANSITION,
LoaderStateTransition.LOADING_TO_LOADED,
LoaderStateTransition.LOADING_TO_EMPTY,
],
LoaderStateTransition.LOADING_NO_TRANSITION: [
LoaderStateTransition.LOADING_NO_TRANSITION,
LoaderStateTransition.LOADING_TO_LOADED,
LoaderStateTransition.LOADING_TO_EMPTY,
LoaderStateTransition.LOADED_NO_TRANSITION, # когда смена состояний происходит на границе окна
],
LoaderStateTransition.LOADING_TO_LOADED: [
LoaderStateTransition.LOADED_NO_TRANSITION,
LoaderStateTransition.LOADED_TO_UNLOADING,
],
LoaderStateTransition.LOADED_NO_TRANSITION: [
LoaderStateTransition.LOADED_NO_TRANSITION,
LoaderStateTransition.LOADED_TO_UNLOADING,
LoaderStateTransition.UNLOADING_NO_TRANSITION, # когда смена состояний происходит на границе окна
],
LoaderStateTransition.LOADED_TO_UNLOADING: [
LoaderStateTransition.UNLOADING_NO_TRANSITION,
LoaderStateTransition.UNLOADING_TO_EMPTY,
],
LoaderStateTransition.UNLOADING_NO_TRANSITION: [
LoaderStateTransition.UNLOADING_NO_TRANSITION,
LoaderStateTransition.UNLOADING_TO_EMPTY,
LoaderStateTransition.EMPTY_NO_TRANSITION, # когда смена состояний происходит на границе окна
],
LoaderStateTransition.UNLOADING_TO_EMPTY: [
LoaderStateTransition.EMPTY_NO_TRANSITION,
LoaderStateTransition.EMPTY_TO_LOADING,
],
LoaderStateTransition.LOADING_TO_EMPTY: [
LoaderStateTransition.EMPTY_NO_TRANSITION,
LoaderStateTransition.EMPTY_TO_LOADING,
],
}
if as_int:
d = {key.value: [v.value for v in vals] for key, vals in d.items()}
return d
Итак, у нас есть таблица прогнозов. Нам нужно в каждой строке выбрать «наилучший» прогноз, и построить цепочку таких «наилучших» прогнозов от первой строки таблицы до последней через все промежуточные строки последовательно. Ничего внутри не ёкает? Да это ж классическая задача поиска «наилучшего» пути из теории графов! Граф у нас n-дольный, т.е. каждая строка таблицы содержит вершины графа, принадлежащие одной доли, и переходов между вершинами одной доли нет. Есть только переходы между долями, причем ребра-переходы будут ориентированными — мы можем перейти из первой доли-строки таблицы во вторую, но из второй строки-доли не можем перейти обратно во времени в первую строку. Ну и как было сказано выше, переходы возможны не между любыми состояниями, а только в соответствии с get_possible_transitions. Таким образом, в отличие от жадного алгоритма мы будем максимизировать вероятность не в каждой строке таблицы, а для пути в целом, учитывая ограничения.
Конкретный алгоритм поиска «наилучшего» пути в нашем случае — алгоритм Беллмана-Форда:
from collections import deque
def bellman_ford(adj_list: list[deque], start: int) -> tuple[list[float], list[int]]:
"""
Алгоритм Беллмана-Форда.
Строит "наилучший" путь от start до всех остальных вершин.
"""
n = len(adj_list)
dist = [float('-inf')] * n
pred = [-1] * n
dist[start] = 0
q = deque([start])
while len(q) > 0:
from_ = q.pop()
for to, d in adj_list[from_]:
if dist[to] < dist[from_] + d:
dist[to] = dist[from_] + d
pred[to] = from_
q.appendleft(to)
return dist, pred
Он использует представление графа в виде списка смежности adj_list, а формирование этого списка смежности из таблицы прогнозов инкапсулируем в отдельный класс PredictionsTable (жадный алгоритм реализован там же):
import numpy as np
from dataclasses import dataclass, field
from collections import deque
import operator
@dataclass
class PredictionsTable:
init_label: int
target_names: list[str]
table: np.ndarray
transitions: dict[int, list[int]] = field(default_factory=dict)
@property
def shape(self) -> tuple[int, int]:
n_cols = len(self.table[0]) if len(self.table) > 0 else 0
return len(self.table), n_cols
def __str__(self) -> str:
s = 'Columns:\n'
for i, name in enumerate(self.target_names):
s += f'\t{i} - {name}\n'
s += '\n'
for row in self.table:
s += ' '.join([f'{cp.prob:0.4f}' for cp in row]) + '\n'
n_rows, n_cols = self.shape
s += f'\n{n_rows} rows × {n_cols} columns'
return s
def get_best_path(self, method: str = 'bf') -> list[int]:
if method == 'bf': # Беллман-Форд
return self._get_best_path_bf()
elif method == 'greedy':
return self._get_best_path_greedy()
else:
raise ValueError("Unknown method for best path:", method)
def _get_best_path_bf(self) -> list[int]:
def _get_adj_list() -> list[deque[tuple[int, float]]]:
"""
Формирует список смежности вершин графа
"""
# Число вершин нашего графа = число всех прогнозов всех моделей + одно достоверное стартовое состояние
n_rows, n_cols = self.shape
n_predictions = 1 + n_cols * n_rows
adj_list = [deque() for _ in range(n_predictions)]
# Ребра из стартовой вершины:
for transition in self.transitions[self.init_label]:
d = self.table[0][transition].prob
to = 1 + transition
adj_list[0].append((to, d))
# Ребра из остальных вершин:
for i, row in enumerate(self.table[:-1]):
for j in range(len(row)):
from_ = 1 + i * n_cols + j
for transition in self.transitions[j]:
to = 1 + (i + 1) * n_cols + transition
d = self.table[i + 1][transition].prob
adj_list[from_].append((to, d))
return adj_list
def _restore_path(pred: list[int], end: int) -> list[int]:
"""
Восстанавливает путь по конечной вершине end и списку предыдущих вершин pred
"""
path = []
n_cols = self.shape[1]
v = end
while v != 0:
label = (v - 1) % n_cols
path.append(label)
v = pred[v]
return path[::-1]
adj_list = _get_adj_list()
dist, pred = bellman_ford(adj_list, start=0)
n_cols = self.shape[1]
end = len(dist[:-n_cols]) + np.array(dist[-n_cols:]).argmax()
best_path = _restore_path(pred, end)
return best_path
def _get_best_path_greedy(self) -> list[int]:
# Из каждой строки таблицы берем прогноз с максимальной вероятностью
path = []
for row in self.table:
best_pred = sorted(row, key=operator.attrgetter('prob'), reverse=True)[0]
path.append(best_pred.label)
return path
И код использования этого класса:
init_label = y_train[-1] # достоверное стартовое состояние
transitions = get_possible_transitions()
target_names = [s.name for s in LoaderStateTransition]
predictions_table = PredictionsTable(
init_label=init_label,
table=y_pred,
transitions=transitions,
target_names=target_names
)
best_path_bf = predictions_table.get_best_path(method='bf')
Вот результат работы Беллмана-Форда:
И число рейсов, и число неудачных погрузок полностью совпадает с ground truth.
Вот так комбинация машинного обучения и теории графов помогла решить эту задачу.
В настоящий момент мы уже проводим тесты алгоритма в условиях реальных данных на борту погрузчика, и в скором времени накопим достаточно аналитики, чтобы посчитать качество детекции рабочих циклов, а также разберём, в каких случаях даже человеку сложно понять, стоит засчитывать здесь рейс или нет.
Не отключитесь, подписывайтесь, чтобы не пропустить следующий материал по теме, если интересно. И конечно, будем рады ответить на ваши вопросы и услышать предложения и замечания.