Преамбула
Фирма, в которой я работаю, производит аппаратуру для нейрохирургов и нейрофизиологов, в основе которой лежит технология Deep Brain Stimulation. Если коротко, в живой мозг втыкается электрод, и нейрохирург может считывать из мозга сигнал или стимулировать клетки мозга разрядом тока. У технологии огромное будущее в деле лечения болезней (например, болезни Паркинсона, дистонии, эпилепсии) и в создании нейроинтерфейсов: нейропротезирование, в частности, восстановление зрения; аугментация мозга дополнительными устройствами, расширение возможностей мозга. Скажем, спецслужбы очень интересуются способом считывать и записывать информацию из зрительных и слуховых нервов, а также управлять движением животных, что позволит создать новый класс шпионов.
Для клинического применения в целях лечения тремора при болезни Паркинсона достаточно лишь нескольких внедряемых контактов (а некоторые нейрохирурги обходятся вообще одним). Но для исследователей, изучающих работу мозга, количество контактов имеет значение, и им нужно получать данные от как можно большего числа контактов одновременно и синхронно. Скажем, тысяча, или две тысячи внедрённых в мозг контактных площадок. Понятно, хотелось бы, чтобы и скорость была пристойной, — скажем, сорок тысяч замеров с каждого контакта в секунду. И чтоб резолюция была повыше, чтоб каждый замер был хотя бы в 32 бита, в формате float или Q. Получаем, что система производит порядка 320 мегабайт данных в секунду, так что всё это придётся обрабатывать.
Кроме считываемых непосредственно из мозга «чистых» данных, есть ещё данные отфильтрованные: результат применённых к замерам фильтров верхних и нижних частот. Используются фильтры высоких порядков, хотя бы четвёртого, реализованные в виде полиномов. Они применяются к каждому входящему замеру, увеличивая количество данных, о которых система должна заботиться, вчетверо, и поднимая количество генерируемых данных до 1,3 гигабайта в секунду. Но это уже не моя забота, потому что они генерируются из переданных мной данных после того, как я свою часть работы выполнил.
Результат всего этого счастья нужен в режиме реального времени и крайне важен. Пропускать нельзя ни одного замера, потому что основную работу по анализу данных исследователи выполняют после окончания эксперимента. Поэтому всё это богатство данных, помимо показа на экране, придётся записывать на жёсткий диск. Все 1,3 гигабайта данных в секунду. И потом читать в Matlab`е, NeuroExplorer`е или другой программе. Система, которая сохраняла 99,99999% данных, не прошла контроль качества и была забракована, потому что теряла до 13 тысяч замеров каждую секунду.
А вот теперь мы со всем этим попробуем взлететь.
Формулировка задачи
Имеется разработанная фирмой плата с контроллером FPGA, с одной стороны в которую воткнуты провода, идущие от мозга, (ну, на самом деле, от конвертеров, типа такого), а с другой есть выход PCIe. Этим выходом плата будет воткнута в порт PCIe на самом обычном, просто очень мощном, компьютере.
Мне предстояло создать драйвер, который получает данные потенциалов от этой нашей кастомной платы, обрабатывает на подключённой к тому же PCIe видеокарте (результат расчёта фильтров одного контакта не зависит от результатов другого; глупо не использовать для независимых расчётов процессор, который специально заточен на выполнение большого числа однотипных параллельных расчётов одновременно) и передаёт дальше в пользовательский интерфейс. И делать это надо очень-очень быстро, потому что новые пакеты с восемью замерами данных от каждого контакта приходят каждые 200 микросекунд. И, самое главное, сделать это надо под десятой Виндой, потому что нейрохирурги не знают и знать не хотят ничего, кроме Винды и Мака. Причём, судя по внешнему виду некоторых клиентов и адекватности высказываемых ими требований к программе, последнее слово предыдущего предложения можно писать с маленькой буквы.
Люди в теме уже сообразили, что речь идёт о hard realtime: гарантированный ответ на полученные данные в течение фиксированного времени, и неважно, что за дичь творится вокруг, без возможности задержаться или пропустить хотя бы один пакет. Эти же люди в теме уже покачали головой: творящаяся вокруг дичь — это Windows, hard realtime под Windows невозможен, Windows не операционная система реального времени. Более того, Windows не заточен под работу с квантами времени меньше миллисекунды, поэтому работа со скоростью «полный цикл обработки данных за 200 микросекунд» под Windows невозможна вдвойне.
Soft realtime отличается от hard realtime тем, что в soft иногда небольшие задержки всё-таки разрешены, при условии, что система от задержки очухается и успеет наверстать упущенное и разгрести данные, накопившиеся за время задержки, без потери производительности.
Существуют всякие расширения под Windows, которые позволяют частично имплементировать realtime. Например, операционные системы On Time, RTX64 от IntervalZero и прочие. Они сводятся к одной и той же идее: мы отбираем у Винды одно или несколько ядер и кусок памяти, делаем вид, что их в компьютере больше нет, и запускаем на них свою собственную операционку. После того, как этот монстр Франкенштейна раскочегарится и выйдет на рабочий режим, на компьютере будут работать одновременно две операционных системы: realtime и Windows. Между ними можно настроить общение. Это решение будет работать с двумя оговорками: во-первых, из-под Windows практически нет возможностей повлиять на то, что происходит внутри запущенной параллельно realtime ОС, (например, программы для неё надо компилировать при помощи проприетарной SDK; нельзя во время работы передать в неё свою собственную программу для обработки получаемых данных и запустить её), а во-вторых, стоимость этого решения, мягко говоря, неадекватна. Лицензия разработчика RTX64 стоит порядка 10 тысяч долларов, а за каждый экземпляр готового продукта, ушедшего клиенту (тому самому нейрохирургу), придётся заплатить ещё 500 долларов. Вдобавок к 600-долларовой лицензии на Винду, которую клиент тоже получит. Это выводит общую стоимость продукта из зоны конкурентоспособности и делает его финансово непривлекательным для потенциальных покупателей.
За десять тысяч долларов плюс неустановленное количество 500-долларовых лицензионных платежей я сам себе RTOS под Windows напишу, подумал я. И написал.
Применённые технические хитрости
Во-первых, нам нужно, чтобы как можно больше работы выполняла наша плата с FPGA на борту. Скажем, передачу данных лучше перевесить на неё: у неё DMA-контроллер точно ничем не будет занят, нет шансов, что, когда нам потребуется DMA-канал, Винда скажет нам в ответ «в очередь, линуксьи дети, в очередь!»
Как подключать FPGA к PCIe так, чтобы DMA писал данные, куда надо, это совсем-совсем отдельная тема, которая выходит за рамки данной статьи. Скажу только, что FPGA должен быть сконфигурирован как PCIe Endpoint, потому что компьютер остаётся Root Complex, — ему ведь ещё видеоадаптером управлять. При этом, раз DMA запускается платой, то и трансляция адресов должна выполняться на плате. И тут возникает вопрос: а куда плата будет писать? Изнутри Windows я могу работать только с виртуальными адресами. Даже если я выделю реальную память при помощи MmAllocateContiguousMemory, я получу только виртуальный адрес, достучаться до которого плата не сможет.
Так что совсем без решений Франкенштейна обойтись не удалось. Я резервирую кусок физической памяти на компьютере для использования только нашим устройством, выполнив в командной строке от Администратора команду следующего вида:
bcdedit /set removememory Х
(Х — сколько мегабайт зарезервировать)Таким образом, последние мегабайты скрыты от Windows, и обращаться к ним Винда не может. Помимо гарантии отсутствия столкновений на memory bus, таким образом решается ещё несколько проблем, в частности, отсутствует нужда в синхронизации доступа, что лишает меня необходимости использовать долгие и медленные семафоры и мьютексы. (Синхронизацию между записью данных в память и чтением можно осуществлять по времени: пусть плата пишет в пять буферов с разницей в 200 микросекунд; зная, что в нулевой буфер она писала в целое число миллисекунд, я буду читать буферы с отставанием на один: в целую миллисекунду — четвёртый, в миллисекунду и 200 микросекунд — нулевой, в миллисекунду и четыреста микросекунд — первый, и так далее. Как синхронизировать время на уровне микросекунд между двумя устройствами — задача, при наличии канала связи между ними, решаемая).
Драйвер, который будет читать данные из зарезервированной памяти, бежит строго на одном ядре. Для этого я меняю его привязку к процессору:
/* * The ID of the PCI driver CPU core. Starting from 0. */ static constexpr USHORT DRIVER_CPU_ID = 3; . . . . // Set the thread to run on specific processor KAFFINITY affinity = 1ULL << (DRIVER_CPU_ID); KeSetSystemAffinityThread(affinity);
— и поднимаю его приоритет, но не до высшего, а до того, который чуть ниже. На самом высшем приоритете некоторые системные функции не работают, и критические системные задачи, которые бегут на таком же приоритете, не будут выполняться:
// Set the thread priority to the highest available -1 // Тhe "-1" is because running for a long time in HIGH_PRIORITY // "starves" important system tasks which run in HIGH_PRIORTY KeSetPriorityThread(PsGetCurrentThread(), HIGH_PRIORITY - 1);
Но этого недостаточно. Нужно не только чтобы этот процесс бежал на одном ядре, но и чтобы никакой другой процесс на этом ядре не бежал. Для этого я поднимаю приоритет прерываний, которые могут прервать выполнение моего процесса (
KIRQL
), до максимального (DISPATCH_LEVEL
):KIRQL oldIrql; KeRaiseIrql(DISPATCH_LEVEL, &oldIrql);
Однако процесс не может всё время бежать с запретом на любые прерывания, Винда за этим строго следит и может наглеца прибить. Поэтому периодически я понижаю приоритет прерываний, которым разрешаю свой процесс, гхм, прерывать. Чисто формально, но всё-таки:
// It's important that we don't stay at DISPATCH_LEVEL for too long // so we record the last tick we were at passive, and every once in // a while lower the KIRQL static constexpr ULONG64 MS_ALLOWED = 50; LARGE_INTEGER freq{}; LONGLONG lastPassiveTick = 0; . . . . . . KeQueryPerformanceCounter(&freq); timePassed = ((KeQueryPerformanceCounter(nullptr).QuadPart - lastPassiveTick) * 1000ULL) / freq.QuadPart; if (timePassed >= MS_ALLOWED) { yieldProcessor(); lastTickAtPassive = KeQueryPerformanceCounter(nullptr).QuadPart; } /* Yield Processor means lowering to PASSIVE_LEVEL and then raising back * to DISPATCH_LEVEL. It allows other important tasks to run in between, * if they are fast enough. */ void yieldProcessor() { KIRQL oldIrql; KeLowerIrql(PASSIVE_LEVEL); KeRaiseIrql(DISPATCH_LEVEL, &oldIrql); }
А теперь самое весёлое.
При инициализации драйвера я прохожу по всем имеющимся в операционной системе процессам и меняю их привязку к процессору:
namespace accelerator {
class IAccelerator {
public:
explicit IAccelerator() = default;
virtual void revert() = 0;
virtual void accelerate() = 0;
virtual ~IAccelerator() = default;
};
}
namespace accelerator {
const std::vector<std::wstring> DEFAULT_BLACKLIST_PROCESSES = {
L"system",
L"system.exe",
L"winlogon.exe"
};
class AffinitySetter : public IAccelerator {
public:
/**
* Sets the processor affinity of all processes.
*
* Affinity is reset upon reseting the computer.
*
* @param activeCpuIdentifiers The cpu identifiers which should NOT be used by any process.
* @param blacklistProcesses A list of processes that should not be altered.
*
*/
explicit AffinitySetter(std::vector<uint8_t> activeCpuIdentifiers,
std::vector<std::wstring> blacklistProcesses = DEFAULT_BLACKLIST_PROCESSES);
virtual void revert();
virtual void accelerate();
virtual ~AffinitySetter() = default;
private:
ULONG_PTR getAffinityMaskWithoutBlackList(ULONG_PTR maskLimit);
std::vector<uint8_t> m_activeCpuIdentifiers;
std::vector<std::wstring> m_blacklistProcesses;
};
}
. . . . . . .
std::vector<std::unique_ptr<accelerator::IAccelerator>> accelerators;
auto affinitySetter = std::make_unique<accelerator::AffinitySetter>(
std::vector<uint8_t>({ DRIVER_CPU_ID }));
accelerators.push_back(std::move(affinitySetter));
for (auto& accelerator : accelerators) {
accelerator->accelerate();
}
Но и это ещё не всё. Мало позаботиться о тех процессах, которые уже есть, надо ещё позаботиться о тех, которые пользователь создаcт в будущем. Для этого я регистрирую два системных коллбэка, на создание процесса и на создание потока; они вызываются для каждого нового процесса и потока, и я меняю их привязку к процессору:
/* * We want to keep this core to ourself, so register a callback for each * process and thread created. At this callback we change their affinity * (the core they can run on) to be different from our core */ if (!NT_SUCCESS(PsSetCreateProcessNotifyRoutine(newProcessCreated, FALSE))) { DEBUG_TRACE("PsCreateProcessNotifyRoutine failed"); COMPLETE_IRP(Irp, STATUS_UNSUCCESSFUL); } FINALLY([&guardActivator]() { if (guardActivator) { PsSetCreateProcessNotifyRoutine(newProcessCreated, TRUE); } }); if (!NT_SUCCESS(PsSetCreateThreadNotifyRoutine(newThreadCreated))) { DEBUG_TRACE("PsCreateProcessNotifyRoutine failed"); COMPLETE_IRP(Irp, STATUS_UNSUCCESSFUL); } FINALLY([&guardActivator]() { if (guardActivator) { PsRemoveCreateThreadNotifyRoutine(newThreadCreated); } }); . . . . . . void newProcessCreated( HANDLE ParentId, HANDLE ProcessId, BOOLEAN Create ) { UNREFERENCED_PARAMETER(ParentId); if (Create) { KAFFINITY affinity = ~((1ULL << (DRIVER_CPU_ID))); KAFFINITY maximumAffinity = KeQueryActiveProcessors(); affinity &= maximumAffinity; // Get process handle by id HANDLE processHandle; OBJECT_ATTRIBUTES objectAttributes{ 0 }; InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL); CLIENT_ID clientid{ 0 }; clientid.UniqueProcess = ProcessId; auto status = ZwOpenProcess(&processHandle, GENERIC_ALL, &objectAttributes, &clientid); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwOpenProcess failed getting process for pid %d with status %d", ProcessId, status); return; } FINALLY([&processHandle]() { ZwClose(processHandle); }); // Set the process affinity by handle DEBUG_TRACE("Will set process affinity: %d for process: %d", affinity, ProcessId); if (affinity) { status = ZwSetInformationProcess(processHandle, ProcessAffinityMask, &affinity, sizeof(affinity)); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwSetInformationProcess failed getting process affinity for pid %d with status %d", ProcessId, status); return; } } } } void newThreadCreated( HANDLE ProcessId, HANDLE ThreadId, BOOLEAN Create ) { if (Create) { // Thread affinity should eventually be all cpus except our own. KAFFINITY affinity = ~((1ULL << (DRIVER_CPU_ID))); KAFFINITY maximumAffinity = KeQueryActiveProcessors(); affinity &= maximumAffinity; // Get process handle by id HANDLE processHandle; OBJECT_ATTRIBUTES objectAttributes{ 0 }; InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL); CLIENT_ID clientid{ 0 }; clientid.UniqueProcess = ProcessId; auto status = ZwOpenProcess(&processHandle, GENERIC_READ, &objectAttributes, &clientid); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwOpenProcess failed getting process for pid %d with status %d", ProcessId, status); return; } FINALLY([&processHandle]() { ZwClose(processHandle); }); // Get the process affinity by handle PROCESS_BASIC_INFORMATION processInformation; ULONG returnLength; status = ZwQueryInformationProcess(processHandle, ProcessBasicInformation, &processInformation, sizeof(processInformation), &returnLength); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwQueryInformationProcess failed getting process for pid %d with status %d", ProcessId, status); return; } // Reduce affinity to by subset of process affinity &= processInformation.AffinityMask; // Get thread handle by id HANDLE threadHandle; objectAttributes = { 0 }; InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL); clientid = { 0 }; clientid.UniqueThread = ThreadId; status = ZwOpenThread(&threadHandle, GENERIC_ALL, &objectAttributes, &clientid); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwOpenThread failed getting thread for tid %d with status %d", ProcessId, status); return; } FINALLY([&threadHandle]() { ZwClose(threadHandle); }); // Set the thread affinity by handle DEBUG_TRACE("Will set thread affinity: %d for thread: %d", affinity, ThreadId); if (affinity) { status = ZwSetInformationThread(threadHandle, ThreadAffinityMask, &affinity, sizeof(affinity)); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwSetInformationThread failed getting thread affinity for tid %d with status %d", ProcessId, status); return; } } } }
Надо только не забыть убрать эти коллбэки при окончании работы.
Заключение
По факту, я реализовал систему, работающую в реальном времени, внутри Windows. Техника, в общем-то, такая же, как у коммерческих решений типа вышеупомянутой On Time: я забираю под свои цели ядро и часть памяти и не позволяю Windows добираться до них и мешать мне. Но есть и отличие: моё решение работает внутри Windows, в пространстве ядра, и позволяет пользоваться всеми преимуществами операционной системы. Я не ограничен в общении с остальными программами операционной системы и могу использовать весь набор средств для межпроцессорного взаимодействия. Более того, я могу вернуть занятое драйвером ядро обратно Windows в любой момент, достаточно только убрать мои коллбэки и пройтись по процессам, исправляя их привязки.
Время обработки одного пакета данных при таких условиях не превышает 155 микросекунд, включая добавление заголовков к каждому пакету данных. Затем данные передаются из зарезервированной памяти в программу обработки, которая уже заботится о передаче данных в GPU, занимается показом всего этого богатства на экране и сохранением на жёсткий диск. Время передачи данных из платы в память компьютера здесь не учитывается, потому что я начинаю работать только после того, как данные окажутся в памяти.