Использование Atomics.wait(), Atomics.notify() и Atomics.waitAsync()

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Статические методы Atomics.wait() и Atomics.notify() представляют собой низкоуровневые примитивы синхронизации, которые можно применять для реализации мьютексов и других подобных механизмов. Но, так как метод Atomics.wait() является блокирующим, его нельзя вызывать в главном потоке (если попытаться это сделать — будет выдана ошибка TypeError).

Движок V8, начиная с версии 8.7, поддерживает неблокирующий вариант Atomics.wait(), называемый Atomics.waitAsync(). Этим новым методом можно пользоваться в главном потоке.



Сегодня мы расскажем о том, как применить эти низкоуровневые API для создания мьютекса, который может работать и в синхронном режиме (в потоках воркеров) и асинхронно (в потоках воркеров или в главном потоке).

Atomics.wait() и Atomics.waitAsync()


Методы Atomics.wait() и Atomics.waitAsync() принимают следующие параметры:

  • buffer: массив типа Int32Array или BigInt64Array, в основе которого лежит SharedArrayBuffer.
  • index: действительный индекс элемента в массиве.
  • expectedValue: значение, которое, как мы ожидаем, должно быть представлено в памяти, в том месте, которое описано с помощью buffer и index.
  • timeout: тайм-аут в миллисекундах (необязательный параметр, по умолчанию установлен в Infinity).

Atomics.wait() возвращает строку. Если в указанном месте памяти не оказывается ожидаемого значения — Atomics.wait() немедленно завершает работу, возвращая строку not-equal. В противном случае поток блокируется. Для того чтобы блокировка была бы снята, должно произойти одно из следующих события. Первое —  это вызов из другого потока метода Atomics.notify() с указанием того места в памяти, которое интересует метод Atomics.wait(). Второе — это истечение тайм-аута. В первом случае Atomics.wait() возвратит строку ok, во втором — строковое значение timed-out.

Метод Atomics.notify() принимает следующие параметры:

  • typedArray: массив типа Int32Array или BigInt64Array, в основе которого лежит SharedArrayBuffer.
  • index: действительный индекс элемента в массиве.
  • count: количество агентов, ожидающих уведомления (необязательный параметр, по умолчанию установлен в Infinity).

Метод Atomics.notify() уведомляет указанное количество агентов, ожидающих уведомления по адресу, описываемому typedArray и index, обходя их в порядке FIFO-очереди. Если было сделано несколько вызовов Atomics.wait() или Atomics.waitAsync(), наблюдающих за одним и тем же местом в памяти, то все они оказываются в одной и той же очереди.

В отличие от метода Atomics.wait(), метод Atomics.waitAsync() сразу же возвращает значение в место вызова. Это может быть одно из следующих значений:

  • { async: false, value: 'not-equal' } — если указанное место в памяти не содержит ожидаемого значения.
  • { async: false, value: 'timed-out' } — только в тех случаях, когда тайм-аут установлен в 0.
  • { async: true, value: promise } — в остальных случаях.

Промис, по прошествии некоторого времени, может быть успешно разрешён строковым значением ok (если был вызван метод Atomics.notify(), которому переданы сведения о том месте в памяти, которое было передано Atomics.waitAsync()). Он может быть разрешён и со значением timed-out. Этот промис никогда не отклоняется.

В следующем примере продемонстрированы основы использования Atomics.waitAsync():

const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
//                                     |  |  ^ тайм-аут (необязательно)
//                                     |  ^ ожидаемое значение
//                                     ^ индекс

if (result.value === 'not-equal') {
  // Значение в SharedArrayBuffer отличается от ожидаемого.
} else {
  result.value instanceof Promise; // true
  result.value.then(
    (value) => {
      if (value == 'ok') { /* агента уведомили */ }
      else { /* истёк тайм-аут */ }
    });
}

// В этом или в другом потоке:
Atomics.notify(i32a, 0);

Теперь давайте поговорим о том, как создать мьютекс, которым можно пользоваться и в синхронном, и в асинхронном режимах. Надо отметить, что реализация синхронной версии мьютекса ранее уже обсуждалась. Например — в этом материале.

В этом примере мы не будем использовать параметр timeout при вызове Atomics.wait() и Atomics.waitAsync(). Этот параметр может быть использован для реализации условных конструкций, связанных с тайм-аутом.

Наш класс AsyncLock, представляющий мьютекс, работает с буфером SharedArrayBuffer и реализует следующие методы:

  • lock(): блокирует поток до того момента, пока у нас не появится возможность захватить мьютекс (применим только в потоке воркера).
  • unlock(): освобождает мьютекс (этот — противоположность lock()).
  • executeLocked(callback): пытается захватить блокировку, не блокируя при этом поток. Этот метод может быть использован в главном потоке. Он планирует выполнение коллбэка на тот момент, когда мы сможем захватить блокировку.

Взглянем на то, как могут быть реализованы эти методы. Объявление класса включает в себя константы и конструктор, который принимает буфер SharedArrayBuffer.

class AsyncLock {
  static INDEX = 0;
  static UNLOCKED = 0;
  static LOCKED = 1;

  constructor(sab) {
    this.sab = sab;
    this.i32a = new Int32Array(sab);
  }

  lock() {
    /* … */
  }

  unlock() {
    /* … */
  }

  executeLocked(f) {
    /* … */
  }
}

Здесь элемент i32a[0] содержит значение LOCKED или UNLOCKED. Он, кроме того, представляет то место в памяти, которое интересует Atomics.wait() и Atomics.waitAsync(). Класс AsyncLock обеспечивает следующие базовые возможности:

  1. Если i32a[0] == LOCKED и поток оказывается в состоянии ожидания (после вызова Atomics.wait() или Atomics.waitAsync()), наблюдая за i32a[0], он, в итоге, будет уведомлён.
  2. После того, как поток получит уведомление, он попытается захватить блокировку. Если ему это удастся, то, он, когда будет освобождать блокировку, вызовет Atomics.notify().

Синхронные захват и освобождение блокировки


Рассмотрим код метода lock(), который можно вызывать только из потока воркера.

lock() {
  while (true) {
    const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                        /* Старое значение >>> */  AsyncLock.UNLOCKED,
                        /* Новое значение >>> */  AsyncLock.LOCKED);
    if (oldValue == AsyncLock.UNLOCKED) {
      return;
    }
    Atomics.wait(this.i32a, AsyncLock.INDEX,
                 AsyncLock.LOCKED); // <<< значение, ожидаемое в начале работы
  }
}

Когда из потока вызывается метод lock(), сначала он пытается захватить блокировку, используя Atomics.compareExchange() для изменения состояния блокировки с UNLOCKED на LOCKED. Метод Atomics.compareExchange() пытается выполнить атомарную операцию изменения состояния блокировки, он возвращает исходное значение, находящееся в заданной области памяти. Если исходным значением было UNLOCKED, благодаря этому мы узнаем о том, что изменение состояния прошло успешно, и о том, что поток захватил блокировку. Ничего больше делать не нужно.

Если же Atomics.compareExchange() не смог изменить состояние блокировки, это значит, что блокировку удерживает другой поток. В результате поток, из которого вызван метод lock(), пытается воспользоваться методом Atomics.wait() для того чтобы дождаться момента освобождения блокировки другим потоком. Если в интересующей нас области памяти всё ещё хранится ожидаемое значение (в нашем случае — AsyncLock.LOCKED), то вызов Atomics.wait() заблокирует поток. Возврат из Atomics.wait() произойдёт только тогда, когда другой поток вызовет Atomics.notify().

Метод unlock() освобождает блокировку, устанавливая её в состояние UNLOCKED, и вызывает Atomics.notify() для того чтобы уведомить агентов, ожидающих снятия этой блокировки. Предполагается, что операция изменения состояния блокировки всегда завершается успешно. Это так из-за того, что поток, выполняющий эту операцию, удерживает блокировку. Поэтому ничто другое в это время не должно вызывать метод unlock().

unlock() {
  const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                      /* старое значение >>> */  AsyncLock.LOCKED,
                      /* новое значение >>> */  AsyncLock.UNLOCKED);
  if (oldValue != AsyncLock.LOCKED) {
    throw new Error('Tried to unlock while not holding the mutex');
  }
  Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}

В типичном случае всё происходит так: блокировка свободна и поток T1 захватывает её, меняя её состояние с помощью Atomics.compareExchange(). Поток T2 пытается захватить блокировку, вызывая Atomics.compareExchange(), но не может изменить её состояние. Затем T2 вызывает Atomics.wait(), этот вызов блокирует поток. Через некоторое время поток T1 освобождает блокировку и вызывает Atomics.notify(). Это приводит к тому, что вызов Atomics.wait() в T2 возвращает ok и поток T2 выходит из блокировки. После этого T2 пытается захватить блокировку снова. На этот раз ему это удаётся.

Тут могут возникнуть два особых случая. Их разбор призван продемонстрировать причины того, что Atomics.wait() и Atomics.waitAsync() проверяют наличие конкретного значения по заданному индексу элемента массива. Вот эти случаи:

  • T1 удерживает блокировку, а T2 пытается её захватить. Сначала T2 пытается изменить состояние блокировки, пользуясь Atomics.compareExchange(), но ему это не удаётся. Но потом T1 освобождает блокировку до того, как T2 успевает вызвать Atomics.wait(). А уже после этого T2 вызывает Atomics.wait(), откуда тут же происходит возврат значения not-equal. В подобном случае T2 переходит на следующую итерацию цикла и снова пытается захватить блокировку.
  • T1 удерживает блокировку, а T2 вызывает Atomics.wait() и ожидает её освобождения. T1 освобождает блокировку, T2 активируется (осуществляется возврат из Atomics.wait()) и пытается выполнить операцию Atomics.compareExchange() для захвата блокировки. Но другой поток, T3, оказался быстрее. Он уже успел сам захватить эту блокировку. В результате вызов Atomics.compareExchange() не позволяет T2 захватить блокировку. После этого T2 снова вызывает Atomics.wait() и оказывается заблокированным до того момента, пока T3 не освободит блокировку.

Последний особый случай демонстрирует тот факт, что наш мьютекс работает «нечестно». Может случиться так, что поток T2 ожидал освобождения блокировки, но T3 успел захватить её немедленно после её освобождения. Реализация блокировки, более подходящая для реального применения, может использовать несколько состояний блокировки, существующих для того чтобы различать ситуации, в которых блокировка была просто «захвачена», и в которых «при захвате произошёл конфликт».

Асинхронный захват блокировки


Неблокирующий метод executeLocked() можно, в отличие от метода lock(), вызывать из главного потока. Он получает, в качестве единственного параметра, коллбэк, и планирует вызов коллбэка после успешного захвата блокировки.

executeLocked(f) {
  const self = this;

  async function tryGetLock() {
    while (true) {
      const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
                          /* старое значение >>> */  AsyncLock.UNLOCKED,
                          /* новое значение >>> */  AsyncLock.LOCKED);
      if (oldValue == AsyncLock.UNLOCKED) {
        f();
        self.unlock();
        return;
      }
      const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
                                       AsyncLock.LOCKED);
                                   //  ^ значение, ожидаемое в начале работы
      await result.value;
    }
  }

  tryGetLock();
}

Внутренняя функция tryGetLock() сначала, как и прежде, пытается захватить блокировку с помощью Atomics.compareExchange(). Если вызов этого метода приводит к успешному изменению состояния блокировки, функция может вызвать коллбэк, а после этого — освободить блокировку и завершить работу.

Если вызов Atomics.compareExchange() захватить блокировку не позволил, нам нужно попытаться сделать это снова, в тот момент, когда блокировка, возможно, будет свободна. Но мы не можем заблокировать поток и ждать освобождения блокировки. Вместо этого мы планируем новую попытку захвата блокировки с использованием метода Atomics.waitAsync() и возвращаемого им промиса.

Если нам удалось выполнить метод Atomics.waitAsync(), то возвращённый этим методом промис разрешится тогда, когда поток, который удерживал блокировку, вызовет Atomics.notify(). После этого поток, который хотел захватить блокировку, как и прежде, снова пытается это сделать.

Тут возможны те особые случаи, что характерны для синхронной версии (блокировка освобождается между вызовами Atomics.compareExchange() и Atomics.waitAsync(); блокировку захватывает другой поток, делая это между моментами разрешения промиса и вызова Atomics.compareExchange()). Поэтому в подобном коде, применимом в реальных проектах, это необходимо учесть.

Итоги


В этом материале мы рассказали о низкоуровневых примитивах синхронизации Atomics.wait(), Atomics.waitAsync() и Atomics.notify(). Мы разобрали пример создания на их основе мьютекса, который можно применять и в главном потоке, и в потоках воркеров.

Пригодятся ли в ваших проектах Atomics.wait(), Atomics.waitAsync() и Atomics.notify()?
Источник: https://habr.com/ru/company/ruvds/blog/522952/


Интересные статьи

Интересные статьи

Есть несколько способов добавить водяной знак в Битрикс. Рассмотрим два способа.
Продолжаем обзор замечательной тулы для разработки под Windows и не только, Azure DevOps. На этот раз, намучавшись с переменными окружения я решил вынести весь опыт в одну статью. Начиная от того...
Как-то у нас исторически сложилось, что Менеджеры сидят в Битрикс КП, а Разработчики в Jira. Менеджеры привыкли ставить и решать задачи через КП, Разработчики — через Джиру.
Статья посвящена разработке Helm-чартов для Kubernetes с использованием готовых решений из репозиториев чартов. При таком подходе пользователь применяет рецепты сообщества или свои собственны...
Материал, перевод которого мы сегодня публикуем, посвящён процессу разработки системы визуализации динамических древовидных диаграмм. Для рисования кубических кривых Безье здесь используется техн...