Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Библиотека OpenCV состоит из нескольких тысяч функций и алгоритмов. В этой статье мы хотим вам рассказать о том, как обеспечивается гибкость оптимизаций алгоритмов компьютерного зрения в OpenCV для разных архитектур, операционных систем и рабочих окружений.
Мы рассмотрим абстракцию parallel_for_
и механизм Universal Intrinsics и то, как переиспользовать это в вашем проекте.
Статья основана на лекции, которую команда OpenCV читала в рамках зимнего лагеря по оптимизации от Intel в 2020 году. Также доступна видеозапись этой лекции.
OpenCV
В этом году OpenCV исполняется 20 лет. Не исключено, что некоторые участки кода библиотеки даже старше читающих эту статью :) Но несмотря на возраст, библиотека постоянно совершенствуется как пополнением новыми алгоритмами, так и оптимизациями существующих под обновлённое железо, новые архитектуры, новые способы компиляции и развертывания.
Несколько фактов:
- В год OpenCV скачивают более 3 миллионов раз (если учитывать только PyPI + SourceForge).
- Библиотека написана на C++, но имеет автоматические обертки в Python, Java, JavaScript, MATLAB, PHP, Go, C#,… Большинство из них вызывают нативные библиотеки, но даже и в случае, например, JavaScript, оптимизируя код на C++, мы получаем векторизованный и параллельный код на JS (подробнее далее).
- Модульность библиотеки позволяет разделять алгоритмы по областям применения. Модуль core является базовым, а значит, если в вашем проекте уже есть OpenCV, то вам доступны и те оптимизации, о которых рассказано в этой статье.
Сообщество OpenCV активно использует библиотеку на всевозможных конфигурациях и отсюда возникает вопрос, как обеспечить наиболее гибкий способ запускаться эффективно для всех, не переписывая все алгоритмы с нуля или обрамляя их #ifdef
с платформо-зависимыми оптимизациями. Кроме того, как мотивировать разработчиков приносить новые алгоритмы?
Компромисс есть и, наверное, это то состояние OpenCV на сегодняшний день, которое позволяет расширяться на новые горизонты. Так или иначе большая часть алгоритмов работает с массивами данных (многомерными в общем случае) — изображениями, масками, полями, тензорами и т.д. Базисной структурой данных является cv::Mat
— обертка над массивом данных определённого типа и размерностями:
cv::Mat mat(480, 640, CV_8UC3);
int rows = mat.rows; // 480
int cols = mat.cols; // 640
int channels = mat.channels(); // 3
uint8_t* data = mat.ptr<uint8_t>();
// or
std::vector<float> myData(10*11);
cv::Mat myMat(10, 11, CV_32FC1, myData.data());
Давайте посмотрим пример кода оператора выделения границ Прюитт с использованием cv::Mat
:
// | -1 0 +1 |
// Gx = | -1 0 +1 | * A
// | -1 0 +1 |
void prewitt_x(const Mat& src, Mat& dst) {
CV_Assert(src.type() == CV_8UC1);
Mat bsrc;
copyMakeBorder(src, bsrc, 1, 1, 1, 1, BORDER_REPLICATE);
dst.create(src.size(), CV_8UC1);
for (int y = 0; y < dst.rows; ++y)
for (int x = 0; x < dst.cols; ++x) {
dst.at<uchar>(y, x) = bsrc.at<uchar>(y , x+2) - bsrc.at<uchar>(y , x) +
bsrc.at<uchar>(y+1, x+2) - bsrc.at<uchar>(y+1, x) +
bsrc.at<uchar>(y+2, x+2) - bsrc.at<uchar>(y+2, x);
}
}
Это вариант матричного фильтра 3x3 для нахождения граней на изображении. Чтобы не усложнять код обработкой граничных случаев, а акцентировать внимание на оптимизации, добавим на исходное изображение по 1 пикселю на каждую из сторон.
При разрешении 1920x1080 алгоритм показывал около 6.13 миллисекунд на тестовом компьютере при подготовке доклада.
Естественным шагом является распараллеливание алгоритма. Но вспоминаем, что мы решаем не просто задачу о том, что нужно распараллелить алгоритм, а о том, как сделать это универсально, если мы хотим, чтобы как можно больше пользователей нашего функционала почувствовали прирост скорости. Так мы плавно дошли до первой идеи о том, как скрыть под капотом OpenCV знания о конфигурации системы и доступных параллельных фреймворках, не усложняя при этом алгоритм.
parallel_for_
OpenCV-шный parallel_for_
— это абстракция, способная во время компиляции разворачиваться в конкретный параллельный цикл в зависимости от того, под какую OS происходит компиляция и какие библиотеки для параллелизма доступны. Вот полный список вариантов:
- Intel Threading Building Blocks (TBB)
- OpenMP
- Apple GCD
- Windows RT concurrency
- Windows concurrency
- Pthreads
Для нашего примера — распараллелим по строчкам, что обычно и используется:
parallel_for_(Range(0, src.rows), [&](const Range& range) {
for (int y = range.start; y < range.end; ++y)
for (int x = 0; x < dst.cols; ++x) {
dst.at<uchar>(y, x) = bsrc.at<uchar>(y , x+2) - bsrc.at<uchar>(y , x) +
bsrc.at<uchar>(y+1, x+2) - bsrc.at<uchar>(y+1, x) +
bsrc.at<uchar>(y+2, x+2) - bsrc.at<uchar>(y+2, x);
}
});
Для тестовой машины на том же разрешении даёт 2.20ms (x2.78). Больше информации и примеров использования можно найти в документации или поиском по исходному коду.
Universal Intrinsics
Универсальные интринсики — встроенный в OpenCV механизм векторизации кода. Задумка заключается в том, чтобы предоставить единый интерфейс для наиболее общих инструкций разных платформ. Для использования в OpenCV, необходимо лишь подключить заголовочный файл:
#include <opencv2/core/hal/intrin.hpp>
Тогда векторизованный код может выглядеть таким образом:
void process(int* data, int len) {
const cv::v_int32x4 twos = cv::v_setall_s32(2);
int i = 0;
for (; i <= len - 4; i += 4) {
cv::v_int32x4 b0 = cv::v_load(&data[i]);
b0 *= twos;
v_store(&data[i], b0);
}
}
В примере, каждый элемент массива умножается на 2. Отметим важные моменты:
Двойки представлены в векторном виде, поскольку операции, в которых участвовали бы вектор и скаляр, не определены — их можно интерпретировать как операцию над первым элементом, так и над всеми.
Для операций над целочисленными векторами перегружены операторы *, +, -
. Для чисел с плавающей точкой, также перегружен оператор деления.
Поскольку в зависимости от типа данных и особенностей железа более оптимальным может быть своя ширина вектора, доступны также обобщенные vx_load
и соответствующие векторные типы без суффиксов: v_uint8
, v_int32
и так далее. Ширину вектора можно узнать из константы nlanes
для своего типа, например v_uint8::nlanes
.
В зависимости от способа сборки и возможностей исполняемой машины, универсальные интринсики раскрываются в следующие наборы инструкций:
- AVX / SSE (x86)
- NEON (ARM)
- VSX (PowerPC)
- MSA (MIPS)
- WASM (JavaScript)
Таким образом, одна реализация с помощью универсальных интринсиков позволяет сгенерировать оптимизированный код под все поддерживаемые платформы. Универсальность заключается в том, что для разных платформ одни и те же операции выражаются различными инструкциями или набором инструкций. OpenCV же предоставляет API скрывающий эти особенности.
С точки зрения оптимизации в масштабах библиотеки, выделяются два независимых этапа — написание алгоритмов с использованием универсальных интринсиков и добавление новых бэкендов для самого механизма. То есть если алгоритм уже был написан на универсальных интринсиках, введение оптимизаций для новой платформы в большинстве случаев обеспечивает улучшение производительности. Огромную роль в освоении новых платформ играют сторонние контрибьюторы, которые так или иначе заинтересованы в использовании OpenCV в своих конфигурациях. Например, VSX (OpenCV 3.3.1), MSA и WASM (OpenCV 4.1.2).
Со списком поддерживаемых операций можно ознакомиться в документации.
Взглянем на наш алгоритм с использованием Universal Intrinsics:
parallel_for_(Range(0, src.rows), [&](const Range& range) {
for (int y = range.start; y < range.end; ++y) {
const uint8_t* psrc0 = bsrc.ptr(y);
const uint8_t* psrc1 = bsrc.ptr(y + 1);
const uint8_t* psrc2 = bsrc.ptr(y + 2);
uint8_t* pdst = dst.ptr(y);
int x = 0;
for (; x <= dst.cols - v_uint8::nlanes; x += v_uint8::nlanes) {
v_uint8 res = v_add_wrap(v_sub_wrap(vx_load(psrc0+x+2), vx_load(psrc0+x)),
v_add_wrap(v_sub_wrap(vx_load(psrc1+x+2), vx_load(psrc1+x)),
v_sub_wrap(vx_load(psrc2+x+2), vx_load(psrc2+x)) ));
v_store(pdst + x, res);
}
for (; x < dst.cols; ++x) {
pdst[x] = psrc0[x + 2] - psrc0[x] +
psrc1[x + 2] - psrc1[x] +
psrc2[x + 2] - psrc2[x];
}
}
});
В данном примере мы также обрабатываем изображение параллельно полосками, однако внутри строки уже считаем не по одному элементу, а сразу векторами.
Обратите внимание на то, что на случай, если ширина изображения не делится на размерность вектора нацело, существует обработка хвоста. Кстати, когда мы экспериментировали с языком программирования Halide — подметили интересный подход к обработке хвоста путём смещения вектора с перекрытием:
<* * * *> * * * * * *
* * * * <* * * *> * *
* * * * * * <* * * *>
Это, с одной стороны, избавляет от написания похожих циклов-хвостов, с другой — не всегда применимо, например если алгоритм работает in-place и повторная обработка элементов недопустима.
Такой код на изображении 1920x1080 работает 0.2ms, что в 23.5 раз быстрее изначальных 6.13 ms.
Для сравнения, так бы выглядел тот же алгоритм, но с использованием нативных интринсиков:
for (int y = range.start; y < range.end; ++y) {
const uint8_t* psrc0 = bsrc.ptr(y);
const uint8_t* psrc1 = bsrc.ptr(y + 1);
const uint8_t* psrc2 = bsrc.ptr(y + 2);
uint8_t* pdst = dst.ptr(y);
int x = 0;
#if CV_AVX512_SKX
if (CV_CPU_HAS_SUPPORT_AVX512_SKX) {
for (; x <= dst.cols - 64; x += 64) {
__m512i vsrc0 = _mm512_sub_epi8(_mm512_loadu_epi8(psrc0 + x + 2), _mm512_loadu_epi8(psrc0 + x));
__m512i vsrc1 = _mm512_sub_epi8(_mm512_loadu_epi8(psrc1 + x + 2), _mm512_loadu_epi8(psrc1 + x));
__m512i vsrc2 = _mm512_sub_epi8(_mm512_loadu_epi8(psrc2 + x + 2), _mm512_loadu_epi8(psrc2 + x));
_mm512_storeu_epi8(pdst + x, _mm512_add_epi8(vsrc0, _mm512_add_epi8(vsrc1, vsrc2)));
}
}
#endif
#if CV_AVX2
if (CV_CPU_HAS_SUPPORT_AVX2) {
for (; x <= dst.cols - 32; x += 32) {
__m256i vsrc0 = _mm256_sub_epi8(_mm256_loadu_si256(psrc0 + x + 2), _mm256_loadu_si256(psrc0 + x));
__m256i vsrc1 = _mm256_sub_epi8(_mm256_loadu_si256(psrc1 + x + 2), _mm256_loadu_si256(psrc1 + x));
__m256i vsrc2 = _mm256_sub_epi8(_mm256_loadu_si256(psrc2 + x + 2), _mm256_loadu_si256(psrc2 + x));
_mm256_storeu_si256(pdst + x, _mm256_add_epi8(vsrc0, _mm256_add_epi8(vsrc1, vsrc2)));
}
}
#endif
#if CV_SSE2
for (; x <= dst.cols - 16; x += 16) {
__m128i vsrc0 = _mm_sub_epi8(_mm_loadu_si128((__m128i const*)(psrc0 + x + 2)), _mm_loadu_si128((__m128i const*)(psrc0 + x)));
__m128i vsrc1 = _mm_sub_epi8(_mm_loadu_si128((__m128i const*)(psrc1 + x + 2)), _mm_loadu_si128((__m128i const*)(psrc1 + x)));
__m128i vsrc2 = _mm_sub_epi8(_mm_loadu_si128((__m128i const*)(psrc2 + x + 2)), _mm_loadu_si128((__m128i const*)(psrc2 + x)));
_mm_storeu_si128(pdst + x, _mm_add_epi8(vsrc0, _mm_add_epi8(vsrc1, vsrc2)));
}
#elif CV_NEON
for (; x <= dst.cols - 16; x += 16) {
uint8x16_t vsrc0 = vsubq_u8(vld1q_u8(psrc0 + x + 2), vld1q_u8(psrc0 + x));
uint8x16_t vsrc1 = vsubq_u8(vld1q_u8(psrc1 + x + 2), vld1q_u8(psrc1 + x));
uint8x16_t vsrc2 = vsubq_u8(vld1q_u8(psrc2 + x + 2), vld1q_u8(psrc2 + x));
vst1q_u8(pdst + x, vaddq_u8(vsrc0, vaddq_u8(vsrc1, vsrc2)));
}
#elif CV_VSX
for (; x <= dst.cols - 16; x += 16) {
vec_uchar16 vsrc0 = vec_sub(ld(0, psrc0 + x + 2), ld(0, psrc0 + x));
vec_uchar16 vsrc1 = vec_sub(ld(0, psrc1 + x + 2), ld(0, psrc1 + x));
vec_uchar16 vsrc2 = vec_sub(ld(0, psrc2 + x + 2), ld(0, psrc2 + x));
st(vec_add(vsrc0, vec_add(vsrc1, vsrc2)), 0, pdst + x);
}
#elif CV_MSA
for (; x <= dst.cols - 16; x += 16) {
v16u8 vsrc0 = msa_subq_u8(msa_ld1q_u8(psrc0 + x + 2), msa_ld1q_u8(psrc0 + x));
v16u8 vsrc1 = msa_subq_u8(msa_ld1q_u8(psrc1 + x + 2), msa_ld1q_u8(psrc1 + x));
v16u8 vsrc2 = msa_subq_u8(msa_ld1q_u8(psrc2 + x + 2), msa_ld1q_u8(psrc2 + x));
msa_st1q_u8(pdst + x, msa_addq_u8(vsrc0, msa_addq_u8(vsrc1, vsrc2)));
}
#elif CV_WASM
for (; x <= dst.cols - 16; x += 16) {
v128_t vsrc0 = wasm_u8x16_sub(wasm_v128_load(psrc0 + x + 2), wasm_v128_load(psrc0 + x));
v128_t vsrc1 = wasm_u8x16_sub(wasm_v128_load(psrc1 + x + 2), wasm_v128_load(psrc1 + x));
v128_t vsrc2 = wasm_u8x16_sub(wasm_v128_load(psrc2 + x + 2), wasm_v128_load(psrc2 + x));
wasm_v128_store(pdst + x, wasm_u8x16_add(vsrc0, wasm_u8x16_add(vsrc1, vsrc2)));
}
#endif
for (; x < dst.cols; ++x) {
pdst[x] = psrc0[x + 2] - psrc0[x] +
psrc1[x + 2] - psrc1[x] +
psrc2[x + 2] - psrc2[x];
}
}
Практика
В качестве практического задания студентам предлагалось реализовать перекрестный оператор Робертса и выполнить как минимум два раунда оптимизации (с использованием parallel_for_
и универсальных интринсиков):
input: cv::Mat (single channel, uint8_t)
output: cv::Mat (single channel, int32_t)
out = (Gx)^2 + (Gy)^2, where
| +1 0 | | 0 +1 |
Gx = | 0 -1 | * A, Gy = | -1 0 | * A
Базовый проект с практикой доступен по ссылке: https://github.com/dkurt/cv_winter_camp_2020
А также слайды: https://gitpitch.com/dkurt/cv_winter_camp_2020
Желаем всем как минимум x2 в производительности и помните, что ускорение можно получать не только заменой железа но и за счет оптимизации софта.