Приручаем многопоточность в Node.js (часть 5: автомасштабирование под нагрузку)

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

В прошлых частях цикла мы:

  • рассмотрели базовые концепты работы с многопоточностью в JavaScript на примере среды Node.js;

  • научились формировать общую очередь и каналы обмена данными и сигналами, чтобы более эффективно управлять загрузкой потоков;

  • использовали разделяемую память и Atomics-операции как самое быстрое средство обмена большими блоками данных;

  • и создали отдельный поток-координатор, чтобы устранить негативное влияние синхронного кода в основном потоке исполнения на загрузку потоков вспомогательных.

В сегодняшней, заключительной, части я продемонстрирую, как все эти механики вместе позволяют сделать эффективный микросервис, автоматически подстраивающийся под изменения входящей нагрузки.

В данном случае эффективность - это не про максимально возможную скорость обработки каждой отдельной задачи, а про сбалансированное использование аппаратных ресурсов с учетом тех ограничений, на которые мы готовы пойти. Особенно актуально это для различных "облачных" размещений, где оплата идет за фактически потребленные CPU и RAM.

Передача контента сообщений

В предыдущей части мы научились быстро передавать блоки двоичных данных через разделяемую память в обрабатывающие их потоки с помощью потока-координатора, избавившего нас от ожидания исполнения синхронного кода в основном потоке:

Передача данных в поток в схеме с координатором
Передача данных в поток в схеме с координатором

Но, как правило, обрабатываемые сообщения состоят не только из двоичного контента, но и из некоторого объекта, который хочется иметь в доступе по итогу процессинга, а вот передавать его туда целиком вовсе незачем.

Доработаем схему из прошлой статьи, вынеся функционал промежуточного хранения сообщений и формирования единого блока двоичных данных в класс Coordinator, а управления обрабатывающими потоками и очередью - в WorkersPool:

Схема управления потоками
Схема управления потоками

В коде эта схема может выглядеть примерно так:

if (isMainThread) {
  const {Coordinator} = require(...);
  const coordinator = new Coordinator(__filename, ...);
  coordinator
    .on('online', () => {
      // отправка данных
      coordinator.postMessage(message);
    })
    .on('message', (result, message) => { // увязанные [по id] сообщение и результат
      // обработка полученного результата в паре с исходным сообщением
    });
}
else {
  const {workerType, port} = workerData;
  switch (workerType) {
    case 'coordinator':
      const {WorkersPool} = require(__filename, ...);
      const pool = new WorkersPool(...);
      break;
    case 'worker':
      const {shared} = workerData;
      // магия обработки данных в разделяемой памяти
      port.postMessage({id, ...});
      break;
  }
}

Представление данных

Для "увязки" обрабатываемых двоичных данные и исходного сообщения мы можем использовать автоинкрементный id. Но только лишь его - мало, чтобы корректно передать данные в поток.

Создавая блок разделяемой памяти SharedArrayBuffer, мы должны заранее предусмотреть максимальный размер возможного контента. Но работать-то мы можем с разными данными - то есть получается, что передавать размер нам надо тоже, иначе мы не сможем их правильно "выцепить" из разделяемой памяти.

Получается, что каждый отправляемый в поток блок данных должен иметь префикс, состоящий из служебного id и длины самих данных - для этого нам достаточно иметь Uint32Array из двух ячеек.

Но для "склейки" этих данных с двоичными данными из сообщения нам понадобится его Uint8Array-проекция и функция преобразования dataFunc, которая может быть примерно такой:

message => { // функция получения двоичных данных
  // объединяем префикс и данные
  const buf = Buffer.concat([ui8prefix, ...message.part]);
  // записываем в префикс результирующую длину
  buf.writeUInt32LE(buf.length - 8, 4); // Uint32Array.BYTES_PER_ELEMENT
  // возвращаем итоговый двоичный контент
  return buf.buffer;
}

В свою очередь, на стороне обрабатывающего потока позиция id воспринимается не только как Int32-значение (уже знаковое!), но и как ячейка для ожидания блокировки:

const {shared} = workerData;

// [shared] = {lock:int32} + {size:uint32} + {data:uint8[]}
const lock = new Int32Array(shared, 0, 1);
const size = new Uint32Array(shared, Int32Array.BYTES_PER_ELEMENT, 1);
const data = new Uint8Array(shared, Int32Array.BYTES_PER_ELEMENT + Uint32Array.BYTES_PER_ELEMENT);
// ...
  const id = lock[0];
  const messageData = data.subarray(0, size[0]);
Разное представление одних данных
Разное представление одних данных

Передача данных

Давайте детализируем схему выше с точки зрения путей обмена этими данными:

Движение данных в схеме с координатором
Движение данных в схеме с координатором
  1. в основном потоке приложения у нас появляется сообщение message, содержащее некоторый набор двоичных данных

  2. мы передаем его в Coordinator, где оно по автоинкрементному id заносится в промежуточное Map-хранилище messages

  3. с помощью вспомогательной функции dataFunc мы из сообщения извлекаем все необходимые двоичные данные и "склеиваем" их с префиксом, содержащим id и размер данных

  4. результирующий data-буфер мы по ссылке передаем в поток-координатор

  5. в координаторе данные должны или пройти через очередь (если потоки заняты), или сразу копируются в разделяемую память свободного потока

  6. поток, получая уведомление о новых данных, обрабатывает их и отправляет прямо в основной поток, ...

  7. ... где они по id связываются с исходным сообщением и передаются в .emit

Класс Coordinator
class Coordinator extends Worker {
  #messages = new Map(); // хранилище всех обрабатываемых сообщений

  #p32 = new Uint32Array(2);               // префикс данных = [messageID, dataSize]
  #p8  = new Uint8Array(this.#p32.buffer); // ... и его uint8-проекция
  #data = [this.#p8]; // массив для склейки частей в двоичный блок
  #message2data;      // функция получения двоичных данных из сообщения и помещения в массив

  #data2buffer() {
    // объединяем префикс и данные
    const buf = Buffer.concat(this.#data);
    // записываем в префикс результирующую длину
    buf.writeUInt32LE(buf.length - 8, 4); // Uint32Array.BYTES_PER_ELEMENT
    // восстанавливаем состояние массива
    this.#data.length = 1;
    // возвращаем итоговый двоичный контент
    return buf.buffer;
  };

  constructor(filename, options) {
    // сигнальный канал
    const {port1, port2} = new MessageChannel();

    /* доинициализируем необходимые опции
      {
        workerData : {
          workerType : 'coordinator'
        , port       : port2
        }
      , transferList : [port2]
      }
    */
    ((options ??= {}).workerData ??= {}).workerType = 'coordinator';
    ((options ??= {}).workerData ??= {}).port = port2;
    ((options ??= {}).transferList ??= []).push(port2);
    super(filename, options);

    const {dataField, dataArray} = options;
    this.#p32[0] = 0; // ID

    this.#message2data = dataField
      ? message => this.#data.push(dataField(message))
      : message => this.#data.push(...dataArray(message));

    const messages = this.#messages;
    port1.on('message', ({threadId, port}) => {
      // ассоциируем открывшийся порт с конкретным потоком
      port.threadId = threadId;
      this.emit('port.open', port);

      port
        .on('message', result => {
          // из результата обработки по ID получаем исходное сообщение ...
          const message = messages.get(result.id);
          if (message) {
            messages.delete(result.id);
            // ... и передаем вместе с результатом
            this.emit('message', result, message);
          }
        })
        .on('close', () => {
          // при закрытии порта - отписываемся от него
          this.emit('port.close', port);
          port.removeAllListeners();
        });
    });
  }

  // передача двоичных данных в поток
  postMessage(message) {
    // сохраняем объект сообщения в хранилище
    this.#messages.set(this.#p32[0], message);
    // добавляем в общий массив для склейки одно или несколько полей
    this.#message2data(message);
    // формируем целевой контейнер данных
    const buffer = this.#data2buffer();
    // передаем двоичный контент с префиксом в поток по ссылке
    super.postMessage(buffer, [buffer]);
    // id = (id + 1) % 0x10000000
    this.#p32[0]++;
    this.#p32[0] &= 0x0FFFFFFF;
  }
}

На что тут стоит обратить внимание:

  • для связи между объектом Coordinator в основном потоке и потоком-координатором мы подняли служебный MessageChannel, состоящий из пары портов, как это было описано во второй части серии

  • именно по этому каналу поток-координатор передает нам порты для порождаемых им обрабатывающих потоков

  • по этим-то портам рабочие потоки и сбрасывают нам результаты своей деятельности прямо в основной поток

Работа с портами MessageChannel
Работа с портами MessageChannel

Разумное количество потоков

Так, с передачей данных в обрабатывающие потоки и обратно в основной поток - разобрались. Но сколько таких потоков нам необходимо иметь вообще?

Сначала обратим внимание на тот факт, что некоторых ресурсов стоит уже само создание потока. Если же оно включает в себя какую-то "тяжелую" инициализационную подготовку вроде прегенерации кэша внутри потока, то его старт может кратковременно занимать логическое ядро CPU даже на все 100%.

Отсюда следует четыре достаточно простых вывода:

  • потоки стоит порождать/убивать как можно реже

  • не стоит это делать одновременно - то есть допустима некоторая задержка на создание/уничтожение потока

  • как минимум, один поток обработки должен существовать всегда

  • потоков должно быть не больше, чем CPU-ядер

Раз уж мы упомянули тот факт, что внутри потока могут формироваться собственные данные или кэш, то становится выгодно иметь лишь минимально необходимое число потоков, насколько это вообще возможно, чтобы занимать как можно меньше памяти и улучшать долюcache hit.

Как упоминалось выше, для решения наших задач будет логично выделить класс WorkersPool, работающий внутри потока-координатора, который и будет осуществлять все управление потоками и очередью.

Критерий "необходимости"

Но как понять, что существующего количества потоков уже стало недостаточно, и все-таки необходимо создать еще один, несмотря на все сопутствующие издержки?

Фактически, нас интересует лишь один параметр работы нашего сервиса - чтобы все задачи обрабатывались достаточно быстро. А любая поступившая задача может быть либо сразу отдана на исполнение какому-то из потоков, либо поставлена в очередь.

Очередь, кстати, возьмем все ту же, из прошлой статьи, на основе кольцевого буфера:

this.#queue = new (require('./Pow2Buffer'))(queuePowMin, queuePowMax);

Сама длина очереди "в штуках" нам не сильно о чем-то говорит, ведь мы не оцениваем ни скорость обработки, ни "вес" каждой отдельной задачи. Но если мы задумаемся, что нахождение задачи в очереди в течение 1 миллисекунды нас совсем не напрягает, а "зависание" в течение 1 секунды не устраивает совсем, то где-то на этом интервале [1ms .. 1s] найдем комфортное значение, сколько мы готовы достаточно безболезненно позволить задаче находиться в очереди - скажем, это будет 100ms.

То есть пока время пребывания самой старой задачи в очереди не перевалило за это значение, выгоднее еще немного подождать, чем стартовать еще один поток.

Чтобы не заниматься этой оценкой на каждой операции, повесим эту проверку на таймер прямо в конструкторе класса.

Класс WorkersPool (каркас)
class WorkersPool {
  #options; // кэш аргументов конструктора

  #mainWorker;              // основной рабочий поток
  #workersPool = [];        // пул свободных дополнительных потоков
  #workersSet  = new Set(); // полный набор всех активных потоков
  #workersRemain;           // потоков еще доступно к созданию

  #queue;      // очередь на кольцевом буфере

  #checking;   // признак активности проверки состояния

  #queueMore1; // "длина" очереди превышает возможности 1 потока
  #queueMoreW; //                                   ... всех существующих потоков

  #checkQueue(workers) {
    // если в очереди задач уже больше, чем потоков,
    // ... и самая старая задача висит дольше, чем обработали бы все активные потоки за время старта нового
    return this.#queue.length > workers && this.#queue[0].ts + this.#options['timeoutSpawn'] * workers < Date.now();
  }

  constructor(workerFile, options) {
    options['workerFile'] ??= workerFile;
    const {poolSize, queuePowMin, queuePowMax, timeoutIdle, intervalCheck} = options;
    this.#options = options;

    this.#workersRemain = poolSize;

    // очередь на кольцевом буфере
    this.#queue = new (require('./Pow2Buffer'))(queuePowMin, queuePowMax);

    // главный вспомогательный поток существует сразу и всегда
    this.#mainWorker = this.#createWorker();
    
    // ...
    
    // периодическая проверка очереди и пула дополнительных потоков
    const pool = this.#workersPool;
    setInterval(() => {
      // пора ли порождать еще один поток?
      this.#queueMoreW = this.#checkQueue(this.#workersSet.size);
      // пора ли отдавать задачи дополнительным потокам?
      this.#queueMore1 = this.#queueMoreW || this.#checkQueue(1);
      // закрытие простаивающих дополнительных потоков
      // ...
    }, intervalCheck);
  }

  #createWorkerIfPossible() {
    // проверяем возможность и необходимость (по состоянмю очереди) запуска потока
    if (this.#workersRemain > 0 && this.#checkQueue(this.#workersSet.size)) {
      this.#createWorker();
    }
  }

  #createWorker() {
    this.#workersRemain--;
    this.#workersRemain = -this.#workersRemain; // wrap flag
    // ...

    worker.on('online', () => {
      // продолжим пробовать стартовать следующий после паузы на timeoutSpawn
      setTimeout(() => {
        this.#workersRemain = -this.#workersRemain; // wrap flag
        this.#createWorkerIfPossible();
      }, this.#options['timeoutSpawn']);
    });

    return worker;
  }

  #destroyWorker(worker) {
    // убираем поток из общего набора
    this.#workersSet.delete(worker);
    // завершаем сам поток
    worker.terminate();
    // исключаем воркер из пула, если он там был
    const idx = this.#workersPool.indexOf(worker);
    if (idx >= 0) {
      this.#workersPool.splice(idx, 1);
    }
    // увеличиваем количество доступных к запуску
    this.#workersRemain++;
  }
}

Тут мы использовали #workersRemain одновременно и как счетчик доступных к запуску, и как признак (отрицательное значение) наличия запускающегося в текущий момент потока.

Второе замечание касается использования для пула свободных именно массива, а не Set, поскольку нам важен приоритет потоков при раздаче им заданий на обработку.

Принцип распределения сообщений

Старшему досталась мельница, среднему – осел, ну а младшему пришлось взять себе кота.

[Шарль Перро, "Кот в сапогах"]

С учетом возможности существования собственного кэша в каждом из потоков, нам выгоднее использовать его как можно эффективнее - то есть отправлять задачи в поток, который существует дольше.

В идеале, таким потоком должен всегда оказываться главный рабочий поток, который, как мы договорились выше, существует всегда.

Но он может быть занят выполнением текущей задачи - тогда задача должна быть направлена в очередь, либо первому свободному потоку, если "длина" очереди уже заведомо превосходит возможности первого потока.

Если же ни одного такого потока не нашлось, и общая длина очереди превышает возможности всех потоков за нормативное время порождения потока, то пора задуматься о порождении нового потока:

this.#queueMoreW && this.#workersRemain > 0 && (this.#checking ??= setTimeout(() => {
  this.#createWorkerIfPossible();
  this.#checking = null;
}, this.#options['intervalCheck']));

Такая конструкция не только заставляет ждать intervalCheck для попытки запуска потока, но и блокирует все остальные попытки на этом интервале.

Получение данных из очередей

На самом деле, когда мы говорим про очередь, надо не забывать, что их, фактически, две.

Когда у нас случается parentPort.on('message', message => ...), это означает, что нам отправили не только конкретное сообщение, но и что-то еще за ним могли успеть положить в очередь порта, которую мы можем сразу извлечь через receiveMessageOnPort, избегнув повторных вызов обработчика, как рассказывалось во второй части.

Получается вот такая нетривиальная схема доставки данных до процессинга в потоке:

"Вычерпываем" очередь порта при любой возможности
"Вычерпываем" очередь порта при любой возможности
.on('message') и .pulse()
  constructor(workerFile, options) {
    // ...
    
    // основная точка приема сообщений из main-потока
    parentPort.on('message', message => {
      // флаг возможности и необходимости продолжать отправку
      let processed;
      const {lock : [lockState], _pulse} = this.#mainWorker;
      if (lockState == THREAD_FREE) { // если основной рабочий поток свободен
        processed = _pulse(message);  // ... отдаем сообщение основному потоку
        message = undefined;          // ... и обнуляем
      }
      // продолжаем "дергать" потоки, пока хоть какая-то очередь (порта или своя) непуста
      // ... и еще кто-то остался кто-то свободный
      while (!processed) {
        const worker = this.#queueMore1 && this.#workersPool.pop();
        if (worker) {
          processed = worker._pulse(message);
          message &&= undefined;
        }
        else {
          // перекладываем всю очередь порта в свою очередь с меткой времени
          const now = Date.now();
          message ??= receiveMessageOnPort(parentPort)?.message;
          while (message) {
            this.#queue.push(message);
            message.ts = now;
            message = receiveMessageOnPort(parentPort)?.message;
          }
          // отложенная проверка необходимости запуска нового потока
          this.#queueMoreW && this.#workersRemain > 0 && (this.#checking ??= setTimeout(() => {
            this.#createWorkerIfPossible();
            this.#checking = null;
          }, this.#options['intervalCheck']));
          return;
        }
      }
    });
  }
  // ...

  #pulseWorker(worker, message) {
    // если нам передали сообщение - пытаемся отдать его потоку
    let processed = !message || worker._send(message);
    // пока он говорит "уже все сразу обработал"...
    while (processed) {
      // извлекаем следующее сообщение из очереди порта основного потока
      const recv = receiveMessageOnPort(parentPort);
      // ... или своей локальной очереди
      message = recv?.message ?? this.#queue.shift();
      if (!message) {
        if (worker !== this.#mainWorker) {
          // когда все очереди закончились - возвращаем дополнительный поток в пул свободных
          const pool = this.#workersPool;
          !pool.includes(worker) && pool.push(worker);
        }
        return true;
      }
      // передаем данные в поток
      processed = worker._send(message);
    }
  };

Сама схема передачи в поток "по блокировке" та же самая, что мы использовали в прошлой части.

"Моя фамилия - Итого"

Осталось только свести все части воедино:

Полный код классов и тестового приложения
const {
  Worker
, isMainThread
, parentPort
, workerData
, MessageChannel
, receiveMessageOnPort
, threadId
} = require('node:worker_threads');

const THREAD_FREE = -1;

class WorkersPool {
  #options; // кэш аргументов конструктора

  #mainWorker;              // основной рабочий поток
  #workersPool = [];        // пул свободных дополнительных потоков
  #workersSet  = new Set(); // полный набор всех активных потоков
  #workersRemain;           // потоков еще доступно к созданию

  #queue;      // очередь на кольцевом буфере

  #checking;   // признак активности проверки состояния

  #queueMore1; // "длина" очереди превышает возможности 1 потока
  #queueMoreW; //                                   ... всех существующих потоков

  #checkQueue(workers) {
    // если в очереди задач уже больше, чем потоков,
    // ... и самая старая задача висит дольше, чем обработали бы все активные потоки за время старта нового
    return this.#queue.length > workers && this.#queue[0].ts + this.#options['timeoutSpawn'] * workers < Date.now();
  }

  constructor(workerFile, options) {
    options['workerFile'] ??= workerFile;
    const {poolSize, queuePowMin, queuePowMax, timeoutIdle, intervalCheck} = options;
    this.#options = options;

    this.#workersRemain = poolSize;

    // очередь на кольцевом буфере
    this.#queue = new (require('./Pow2Buffer'))(queuePowMin, queuePowMax);

    // главный вспомогательный поток существует сразу и всегда
    this.#mainWorker = this.#createWorker();

    // основная точка приема сообщений из main-потока
    parentPort.on('message', message => {
      // флаг возможности и необходимости продолжать отправку
      let processed;
      const {lock : [lockState], _pulse} = this.#mainWorker;
      if (lockState == THREAD_FREE) { // если основной рабочий поток свободен
        processed = _pulse(message);  // ... отдаем сообщение основному потоку
        message = undefined;          // ... и обнуляем
      }
      // продолжаем "дергать" потоки, пока хоть какая-то очередь (порта или своя) непуста
      // ... и еще кто-то остался кто-то свободный
      while (!processed) {
        const worker = this.#queueMore1 && this.#workersPool.pop();
        if (worker) {
          processed = worker._pulse(message);
          message &&= undefined;
        }
        else {
          // перекладываем всю очередь порта в свою очередь с меткой времени
          const now = Date.now();
          message ??= receiveMessageOnPort(parentPort)?.message;
          while (message) {
            this.#queue.push(message);
            message.ts = now;
            message = receiveMessageOnPort(parentPort)?.message;
          }
          // отложенная проверка необходимости запуска нового потока
          this.#queueMoreW && this.#workersRemain > 0 && (this.#checking ??= setTimeout(() => {
            this.#createWorkerIfPossible();
            this.#checking = null;
          }, this.#options['intervalCheck']));
          return;
        }
      }
    });

    // периодическая проверка очереди и пула дополнительных потоков
    const pool = this.#workersPool;
    setInterval(() => {
      // пора ли порождать еще один поток?
      this.#queueMoreW = this.#checkQueue(this.#workersSet.size);
      // пора ли отдавать задачи дополнительным потокам?
      this.#queueMore1 = this.#queueMoreW || this.#checkQueue(1);
      // закрытие простаивающих дополнительных потоков
      if (pool.length) {
        const deadline = Date.now() - timeoutIdle;
        for (const {activity, lock : [lockState], _destroy} of pool) {
          activity < deadline && lockState == THREAD_FREE && _destroy();
        }
      }
    }, intervalCheck);
  }

  #createWorkerIfPossible() {
    // проверяем возможность и необходимость (по состоянмю очереди) запуска потока
    if (this.#workersRemain > 0 && this.#checkQueue(this.#workersSet.size)) {
      this.#createWorker();
    }
  }

  #createWorker() {
    this.#workersRemain--;
    this.#workersRemain = -this.#workersRemain; // wrap flag

    const shared = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT + Uint32Array.BYTES_PER_ELEMENT + this.#options['dataSize']);
    const data = new Uint8Array(shared);
    const lock = new Int32Array(shared, 0, 1); // lock/id - первые 4 байта data
    lock[0] = 0;

    // ждем, пока поток не проинициализируется и не сменит состояние, чтобы его "подергать"
    let worker;
    Atomics.waitAsync(lock, 0, 0)
      .value
      .then(() => lock[0] == THREAD_FREE && worker._pulse());
    
    const {port1, port2} = new MessageChannel();
    worker = new Worker(
      this.#options['workerFile']
    , {
        workerData : {
          workerType : 'worker'
        , shared
        , port       : port2 // передаем порт в рабочий поток
        }
      , transferList : [port2]
      }
    );

    // отправляем порт в основной поток, в Coordinator
    this.#options['port'].postMessage(
      {
        threadId : worker.threadId
      , port     : port1
      }
    , [port1]
    );

    this.#workersSet.add(worker);

    worker.id = worker.threadId;
    worker.port = port2;
    worker.lock = lock;
    worker.data = data;

    worker._pulse   = this.#pulseWorker.bind(this, worker);
    worker._destroy = this.#destroyWorker.bind(this, worker);
    worker._send    = this.#sendMessage.bind(this, worker);

    worker.on('online', () => {
      // продолжим пробовать стартовать следующий после паузы на timeoutSpawn
      setTimeout(() => {
        this.#workersRemain = -this.#workersRemain; // wrap flag
        this.#createWorkerIfPossible();
      }, this.#options['timeoutSpawn']);
    });

    return worker;
  }

  #pulseWorker(worker, message) {
    // если нам передали сообщение - пытаемся отдать его потоку
    let processed = !message || worker._send(message);
    // пока он говорит "уже все сразу обработал"...
    while (processed) {
      // извлекаем следующее сообщение из очереди порта основного потока
      const recv = receiveMessageOnPort(parentPort);
      // ... или своей локальной очереди
      message = recv?.message ?? this.#queue.shift();
      if (!message) {
        if (worker !== this.#mainWorker) {
          // когда все очереди закончились - возвращаем дополнительный поток в пул свободных
          const pool = this.#workersPool;
          !pool.includes(worker) && pool.push(worker);
        }
        return true;
      }
      // передаем данные в поток
      processed = worker._send(message);
    }
  };

  #sendMessage(worker, message) {
    const {lock, data, _pulse} = worker;
    // записываем из ui8-проекции в разделяемую память вместе с id
    data.set(new Uint8Array(message));
    // id - это первые 4 байта data
    const id = lock[0];
    // уведомляем поток
    Atomics.notify(lock, 0, 1);
    // фиксируем момент последней активности потока
    worker.activity = Date.now();
    // ждем, пока поток не обработает и не позовет нас
    const {value} = Atomics.waitAsync(lock, 0, id);
    if (value === 'not-equal') {
      // если он сразу успел обработать, то в него можно и дальше писать
      return true;
    }
    else {
      // если не сразу - "подергаем", когда освободится
      value.then(() => lock[0] == THREAD_FREE && _pulse());
    }
  }

  #destroyWorker(worker) {
    // убираем поток из общего набора
    this.#workersSet.delete(worker);
    // закрываем обе стороны ассоциированного MessageChannel
    worker.port.close();
    // завершаем сам поток
    worker.terminate();
    // исключаем воркер из пула, если он там был
    const idx = this.#workersPool.indexOf(worker);
    if (idx >= 0) {
      this.#workersPool.splice(idx, 1);
    }
    // увеличиваем количество доступных к запуску
    this.#workersRemain++;
  }
}

class Coordinator extends Worker {
  #messages = new Map(); // хранилище всех обрабатываемых сообщений

  #p32 = new Uint32Array(2);               // префикс данных = [messageID, dataSize]
  #p8  = new Uint8Array(this.#p32.buffer); // ... и его uint8-проекция
  #data = [this.#p8]; // массив для склейки частей в двоичный блок
  #message2data;      // функция получения двоичных данных из сообщения и помещения в массив

  #data2buffer() {
    // объединяем префикс и данные
    const buf = Buffer.concat(this.#data);
    // записываем в префикс результирующую длину
    buf.writeUInt32LE(buf.length - 8, 4); // Uint32Array.BYTES_PER_ELEMENT
    // восстанавливаем состояние массива
    this.#data.length = 1;
    // возвращаем итоговый двоичный контент
    return buf.buffer;
  };

  constructor(filename, options) {
    // сигнальный канал
    const {port1, port2} = new MessageChannel();

    /* доинициализируем необходимые опции
      {
        workerData : {
          workerType : 'coordinator'
        , port       : port2
        }
      , transferList : [port2]
      }
    */
    ((options ??= {}).workerData ??= {}).workerType = 'coordinator';
    ((options ??= {}).workerData ??= {}).port = port2;
    ((options ??= {}).transferList ??= []).push(port2);
    super(filename, options);

    const {dataField, dataArray} = options;
    this.#p32[0] = 0; // ID

    this.#message2data = dataField
      ? message => this.#data.push(dataField(message))
      : message => this.#data.push(...dataArray(message));

    const messages = this.#messages;
    port1.on('message', ({threadId, port}) => {
      // ассоциируем открывшийся порт с конкретным потоком
      port.threadId = threadId;
      this.emit('port.open', port);

      port
        .on('message', result => {
          // из результата обработки по ID получаем исходное сообщение ...
          const message = messages.get(result.id);
          if (message) {
            messages.delete(result.id);
            // ... и передаем вместе с результатом
            this.emit('message', result, message);
          }
        })
        .on('close', () => {
          // при закрытии порта - отписываемся от него
          this.emit('port.close', port);
          port.removeAllListeners();
        });
    });
  }

  // передача двоичных данных в поток
  postMessage(message) {
    // сохраняем объект сообщения в хранилище
    this.#messages.set(this.#p32[0], message);
    // добавляем в общий массив для склейки одно или несколько полей
    this.#message2data(message);
    // формируем целевой контейнер данных
    const buffer = this.#data2buffer();
    // передаем двоичный контент с префиксом в поток по ссылке
    super.postMessage(buffer, [buffer]);
    // id = (id + 1) % 0x10000000
    this.#p32[0]++;
    this.#p32[0] &= 0x0FFFFFFF;
  }
}

const taskSize = 1 << 16;

if (isMainThread) {
  const {randomBytes} = require('node:crypto');
  const fs = require('node:fs');
  const {tmpdir} = require('node:os');
  const {sep} = require('node:path');

  console.log(Date.now(), 'Main : online');
  const messages = Array(1 << 12).fill().map(_ => randomBytes(taskSize));

  // создаем временную папку и в ней файлы со всеми "сообщениями"
  const dir = fs.mkdtempSync(tmpdir() + sep);
  messages.forEach((data, i) => {
    const fn = i.toString(16).padStart(3, '0');
    fs.writeFileSync(dir + sep + fn, data);
  });
  console.log(Date.now(), 'Main : generated');

  const hashes = messages.map(() => undefined);
  let remain;

  const coordinator = new Coordinator(
    __filename
  , {
      dataField : message => message.data // двоичные данные лежат в одном поле
    }
  );
  coordinator
    .on('online', () => {
      console.log(Date.now(), 'Coordinator : online');
      // получаем список всех сообщений
      const fns = fs.readdirSync(dir)
        .sort()
        .map(fn => dir + sep + fn);
      remain = fns.length;

      fns.forEach((fn, id) => {
        const data = fs.readFileSync(fn); // тяжелый синхронный код
        coordinator.postMessage({id, fn, data});
      });
      console.log(Date.now(), 'Main : all send');
    })
    .on('port.open', port => {
      console.log(Date.now(), `Coordinator : dataPort open  = +1 worker [${port.threadId}]`);
    })
    .on('port.close', port => {
      console.log(Date.now(), `Coordinator : dataPort close = -1 worker [${port.threadId}]`);
    })
    .on('message', (result, message) => {
      hashes[message.id] = result.hash;
      if (!--remain) {
        console.log(Date.now(), 'Main : all recv');
        // подождем, пока все завершатся все желающие потоки
        setTimeout(() => {
          process.exit();
        }, 1000);
      }
    });
}
else {
  const {workerType, port} = workerData;
  switch (workerType) {
    case 'coordinator':
      // в потоке-координаторе нет активности, кроме управления пулом рабочих потоков
      const pool = new WorkersPool(
        __filename
      , {
          poolSize      : require('node:os').cpus().length // по количеству CPU-ядер
        , dataSize      : taskSize // предельный размер данных
        , port
        , queuePowMin   : 8  //   256
        , queuePowMax   : 16 // 65536
        , intervalCheck : 10
        , timeoutSpawn  : 100
        , timeoutIdle   : 10
        }
      );
      break;
    case 'worker':
      const {createHash} = require('node:crypto');

      const {shared} = workerData;

      // [shared] = {lock:int32} + {size:uint32} + {data:uint8[]}
      const lock = new Int32Array(shared, 0, 1);
      const size = new Uint32Array(shared, Int32Array.BYTES_PER_ELEMENT, 1);
      const data = new Uint8Array(shared, Int32Array.BYTES_PER_ELEMENT + Uint32Array.BYTES_PER_ELEMENT);

      const processMessage = () => {
        // обрабатываем сообщение
        port.postMessage({
          id   : lock[0]              // связующий ID
        , hash : createHash('sha256').update(
            data.subarray(0, size[0]) // входящий контент
          ).digest('hex')
        });
        // уведомляем координатор о своей доступности
        lock[0] = THREAD_FREE;
        Atomics.notify(lock, 0, 1);
        // ... и возвращаемся к ожиданию блокировки
        wait();
      };

      const wait = () => {
        const {value} = Atomics.waitAsync(lock, 0, THREAD_FREE);
        if (value === 'not-equal') {
          // если значение изменилось, то поток уже обработал задачу, и реагируем сразу
          processMessage();
        }
        else {
          // иначе ждем разрешения Promise блокировки
          value.then(processMessage);
        }
      };

      // "освобождаем" блокировку и уведомляем координатора
      lock[0] = THREAD_FREE;
      Atomics.notify(lock, 0, 1);
      wait();

      // подвешиваем поток в бесконечное ожидание
      port.on('message', () => {});
      break;
  }
}

Если вы все сделали правильно, то должны увидеть примерно такой вывод:

1666250935654 Main : online
1666250940036 Main : generated
1666250940132 Coordinator : online
1666250941187 Main : all send
1666250941195 Coordinator : dataPort open  = +1 worker [2]
1666250941196 Coordinator : dataPort open  = +1 worker [3]
1666250941196 Coordinator : dataPort open  = +1 worker [4]
1666250941225 Coordinator : dataPort close = -1 worker [3]
1666250941244 Main : all recv
1666250941262 Coordinator : dataPort close = -1 worker [4]

В данном случае было запущено одновременно до 3 рабочих потоков: главный #2 и два вспомогательных #3 и #4. Один из них успел завершиться даже раньше, чем мы получили все результаты, а второй - чуть погодя.

На этом цикл статей про многопоточность в JavaScript/Node.js я завершаю, а вы прочитайте предыдущие части - не пожалеете!


  • часть 1: базовые концепты

  • часть 2: очередь, каналы и координатор

  • часть 3: разделяемая память, атомарные операции и блокировки

  • часть 4: координатор против синхронного кода

  • часть 5: автомасштабирование под нагрузку

Источник: https://habr.com/ru/company/tensor/blog/693738/


Интересные статьи

Интересные статьи

В первой и второй частях мы рассказали о ранних компьютерных вирусах 80-х, «эпохи классического киберпанка». К рубежу 90-х годов вирусы были у всех на слуху. Их боялись, о них писали панические ста...
Автор серии публикаций - Алексей Лазарев, руководитель Департамента защиты кибер-физических систем Компании «Актив»В предыдущей части статьи мы рассмотрели текущую ситуацию с криптографическими систем...
В этой части мы добавим несколько улучшений — упакованные дополненные последовательности и маскировка — к модели из предыдущего раздела. Упакованные дополненные последовательности использ...
Wi-Fi 6 (или 802.11 ax) новый стандарт беспроводных сетей. Новый формат, который создавался с целью исправить баги прошлых стандартов. К 2021му году у WiFi накопилос...
Привет, Хабровцы. В этой статье я хочу поделиться с вами немного своим опытом и показать вам мой простой алгоритм, который я придумал для создания Филворда. Под «Филвордом» я буду иметь ввиду эт...