Всем привет. На связи Омельницкий Сергей. Не так давно я вел стрим по реактивному программированию, где рассказывал про асинхронность в JavaScript. Сегодня я бы хотел законспектировать этот материал.
Но перед тем как начать основной материал нам нужно сделать вводную. Итак, давайте начнем с определений: что такое стек и очередь?
Стек — это коллекция, элементы которой получают по принципу «последний вошел, первый вышел» LIFO
Очередь — это коллекция, элементы которой получают по принципу («первый вошел, первый вышел» FIFO
Окей, продолжим.
JavaScript — это однопоточный язык программирования. Это значит, что он имеется только один поток выполнения и один стек, в который помещаются функции в очередь на выполнение. Следовательно в один момент времени JavaScript может выполнить только одну операцию, другие операции при этом будут ждать своей очереди в стеке, пока их не вызовут.
Стек вызовов — это структура данных, которая, упрощенно говоря, записывает сведения о месте в программе, где мы находимся. Если мы переходим в функцию, мы помещаем запись о ней в верхнюю часть стека. Когда мы из функции возвращаемся, мы вытаскиваем из стека самый верхний элемент и оказываемся там, откуда вызывали эту функцию. Это — всё, что умеет стек. А теперь крайне интересный вопрос. Как тогда работает асинхронность в JavasScript?
На самом деле помимо стека в браузерах присутствует особая очередь для работы с так называемым WebAPI. Функции из этой очереди выполнятся по порядку только после того, как стек будет полностью очищен. Только после этого они помещаются из очереди в стек на выполнение. Если в стеке в данный момент находится хотя бы один элемент, то они в стек попасть не могут. Как раз именно из-за этого вызов функций по таймаута часто бывает не точным по времени, так как функция не может попасть из очереди в стек, пока он заполнен.
Рассмотрим следующий пример и займёмся его пошаговым «выполнением». Также рассмотрим посмотрим, что при этом происходит в системе.
console.log('Hi);
setTimeout(function cb1() {
console.log('cb1');
}, 5000);
console.log('Bye');
1) Пока ничего не происходит. Консоль браузера чиста, стек вызовов пуст.
2) Потом команда console.log('Hi') добавляется в стек вызовов.
3) И она выполняется
4) Затем console.log('Hi') удаляется из стека вызовов.
5) Теперь переходим к команде setTimeout(function cb1() {… }). Она добавляется в стек вызовов.
6) Команда setTimeout(function cb1() {… }) выполняется. Браузер создаёт таймер, являющийся частью Web API. Он будет выполнять обратный отсчёт времени.
7) Команда setTimeout(function cb1() {… }) завершила работу и удаляется из стека вызовов.
8) Команда console.log('Bye') добавляется в стек вызовов.
9) Команда console.log('Bye') выполняется.
10) Команда console.log('Bye') удаляется из стека вызовов.
11) После того, как пройдут, как минимум, 5000 мс., таймер завершает работу и помещает коллбэк cb1 в очередь коллбэков.
12) Цикл событий берёт c функцию cb1 из очереди коллбэков и помещает её в стек вызовов.
13) Функция cb1 выполняется и добавляет console.log('cb1') в стек вызовов.
14) Команда console.log('cb1') выполняется.
15) Команда console.log('cb1') удаляется из стека вызовов.
16) Функция cb1 удаляется из стека вызовов.
Взглянем на пример в динамике:
Ну вот мы и рассмотрели как в JavaScript реализована асинхронность. Теперь давайте кратко поговорим об эволюции асинхронного кода.
Эволюция асинхронного кода.
a(function (resultsFromA) {
b(resultsFromA, function (resultsFromB) {
c(resultsFromB, function (resultsFromC) {
d(resultsFromC, function (resultsFromD) {
e(resultsFromD, function (resultsFromE) {
f(resultsFromE, function (resultsFromF) {
console.log(resultsFromF);
})
})
})
})
})
});
Асинхронное программирование, каким мы его знаем в JavaScript, может быть реализовано только функциями. Они могут быть переданы как любая другая переменная другим функциям. Так родились коллбэки. И это прикольно, весело и задорно, пока не превращается в грусть, тоску и печаль. Почему? Да все просто:
- С ростом сложности кода, проект быстро превращается в малопонятные многократно вложенные блоки — «callback hell».
- Обработку ошибок можно легко упустить.
- Нельзя возвращать выражения с return.
С появлением Promise обстановка стала чуть лучше.
new Promise(function(resolve, reject) {
setTimeout(() => resolve(1), 2000);
}).then((result) => {
alert(result);
return result + 2;
}).then((result) => {
throw new Error('FAILED HERE');
alert(result);
return result + 2;
}).then((result) => {
alert(result);
return result + 2;
}).catch((e) => {
console.log('error: ', e);
});
- Появились цепочки промисов, что улучшило читаемость кода
- Появился отдельный метод перехвата ошибок
- Появилась возможность параллельного выполнения с помощью Promise.all
- Вложенную асинхронность мы можем решить с помощью async/await
Но у промиса есть свои ограничения. К примеру промис, без танцев с бубном, нельзя отменить, а что самое главное — работает с одним значением.
Ну вот мы и плавно подошли к реактивному программированию. Устали? Ну благо дело можно можно пойти заварить чаек, обмозговать и вернуться читать далее. А я продолжу.
Реактивное программирование — парадигма программирования, ориентированная на потоки данных и распространение изменений. Давайте более детально разберем что такое поток данных.
// Получаем ссылку на элемент
const input = ducument.querySelector('input');
const eventsArray = [];
// Пушим каждое событие в массив eventsArray
input.addEventListener('keyup',
event => eventsArray.push(event)
);
Представим, что у нас есть поле ввода. Мы создаем массив, и на каждый keyup события input мы будем сохранять событие в нашем массиве. При этом хотелось бы отметить, что наш массив отсортирован по времени т.е. индекс более поздних событий больше, чем индекс более ранних. Такой массив представляет собой упрощенную модель потока данных, но это еще не поток. Для того чтоб этот массив можно было смело назвать потоком он должен уметь каким-то образом сообщать подписчикам, что в него поступили новые данные. Таким образом мы подошли к определению потока.
Поток данных
const { interval1 } = Rx;
const { take } = RxOperators;
interval(1000).pipe(
take(4)
)
Поток — это массив данных, отсортированных по времени, который может сообщать о том, что данные изменились. А теперь представьте как удобно становится писать код, в котором на одно действие потребуется вызывать несколько событий в разных участках кода. Мы просто делаем подписку на поток и он нам сам сообщит когда произойдут изменения. И это умеет делать библиотека RxJs.
RxJS — это библиотека для работы с асинхронными и основанными на событиях программами с использованием наблюдаемых последовательностей. Библиотека предоставляет основной тип Observable, несколько вспомогательных типов (Observer, Schedulers, Subjects) и операторы работы с событиями как с коллекциями (map, filter, reduce, every и подобные из JavaScript Array).
Давайте разберемся с основными понятиями этой библиотеки.
Observable, Observer, Producer
Observable — первый базовый тип, который мы рассмотрим. Этот класс содержит в себе основную часть реализации RxJs. Он связан с наблюдаемым потоком, на который можно как подписаться с помощью метода subscribe.
В Observable реализуется вспомогательный механизм для создания обновлений, так называемый Observer. Источником значений для Observer называется Producer. Это может быть массив, итератор, web socket, какое-то событие и т.п. Так что можно сказать, что observable является проводником между Producer и Observer.
Observable обрабатывает три вида событий Observer:
- next – новые данные
- error – ошибку, если последовательность завершилась по причине исключительной ситуации. это событие так же предполагает завершение последовательности.
- complete — сигнал о завершении последовательности. Это означает, что новых данных больше не будет
Посмотрим демо:
В начале мы обработаем значения 1, 2, 3, а спустя 1 сек. мы получим 4 и завершим наш поток.
И тут я понял, что рассказывать было интересней чем писать об этом. :D
Subscription
Когда мы делаем подписку на поток, мы создаем новый класс subscription, который дает нам возможность отменить подписку с помощью метода unsubscribe. Так же мы можем сгруппировать подписки с помощью метода add. Ну и логично, что мы можем разгруппировать потоки с помощью remove. Методы add и remove на вход принимают другую подписку. Хотелось бы отметить, что когда мы делаем отписку, то мы отписываемся от всех дочерних подписок как будто бы и у них вызывали метод unsubscribe. Идем дальше.
Виды потоков
HOT | COLD |
---|---|
Producer создается снаружи observable | Producer создается внутри observable |
Данные передаются в момент создания observable | Данные сообщаются в момент подписки |
Нужна дополнительная логика для отписки | Поток завершается самостоятельно |
Использует связь один-к-многим | Использует связь вида один-к-одному |
Все подписки имеют единое значение | Подписки независимы |
Данные можно потерять, если нет подписки | Переиздает все значения потока для новой подписки |
Если приводить аналогию, то я бы представил горячий поток как фильм в кинотеатре. В какой момент времени ты пришел, с того момента и начал просмотр. Холодный поток я бы сравнил со звонком в тех. поддержку. Любой позвонивший слушает запись автоответчика от начала до конца, но ты можешь бросить трубку с помощью unsubscribe.
Хотелось бы отметить, что существуют еще так называемые warm потоки ( такое определение я встречал крайне редко и только на зарубежных сообществах ) — это поток, который трансформируется из холодного потока в горячий. Возникает вопрос — где использовать)) Приведу пример из практики.
Я работаю с ангуляром. Он активно использует rxjs. Для получения данных на сервер я ожидаю холодный поток и этот поток использую в шаблоне с помощью asyncPipe. Если я использую этот пайп несколько раз, то, возвращаясь к определению холодного потока, каждый pipe будет запрашивать данные с сервера, что мягко говоря странно. А если я преобразую холодный поток в теплый, то запрос произойдет единожды.
Вообще понимание вида потоков достаточно сложна для начинающих, но важна.
Operators
return this.http.get(`${environment.apiUrl}/${this.apiUrl}/trade_companies`)
.pipe(
tap(({ data }: TradeCompanyList) => this.companies$$.next(cloneDeep(data))),
map(({ data }: TradeCompanyList) => data)
);
Расширить возможность работы с потоками нам предоставляют операторы. Они помогают контролировать события, протекающие в Observable. Мы рассмотрим парочку наиболее популярных, а подробнее с операторами можно ознакомиться по ссылкам в полезной информации.
Operators — of
Начнем со вспомогательного оператора of. Он создает Observable на основе простого значения.
Operators — filter
Оператор фильтрации filter, как можно понять по названию, фильтрует сигнал потока. Если оператор возвращает истину, то пропускает далее.
Operators — take
take — Принимает значение кол-ва эмитов, после которого завершает поток.
Operators — debounceTime
debounceTime — отбрасывает испускаемые значения, которые попадают в указанный промежуток времени между выходными данными — по прошествии временного интервала эмитит последнее значение.
const { Observable } = Rx;
const { debounceTime, take } = RxOperators;
Observable.create((observer) => {
let i = 1;
observer.next(i++);
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next(i++)
}, 1000);
// Испускаем значение раз в 1500мс
setInterval(() => {
observer.next(i++)
}, 1500);
}).pipe(
debounceTime(700), // Ожидаем 700мс значения прежде чем обработать
take(3)
);
Operators — takeWhile
Эмитит значения, пока takeWhile не вернет false, после чего отпишется от потока.
const { Observable } = Rx;
const { debounceTime, takeWhile } = RxOperators;
Observable.create((observer) => {
let i = 1;
observer.next(i++);
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next(i++)
}, 1000);
}).pipe(
takeWhile( producer => producer < 5 )
);
Operators — combineLatest
Комбинированный оператор combineLatest чем-то похож на promise.all. Он объединяет несколько потоков в один. После того как каждый поток сделает хотя бы один эмит, мы получаем последние значения от каждого в виде массива. Далее, после любо любого эмита из объединённых потоков он будет отдавать новые значения.
const { combineLatest, Observable } = Rx;
const { take } = RxOperators;
const observer_1 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next('a: ' + i++);
}, 1000);
});
const observer_2 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 750мс
setInterval(() => {
observer.next('b: ' + i++);
}, 750);
});
combineLatest(observer_1, observer_2).pipe(take(5));
Operators — zip
Zip — ждет значение из каждого потока и формирует массив на основе этих значений. Если значение не придет из какого-либо потока, то группа не будет сформирована.
const { zip, Observable } = Rx;
const { take } = RxOperators;
const observer_1 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next('a: ' + i++);
}, 1000);
});
const observer_2 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 750
setInterval(() => {
observer.next('b: ' + i++);
}, 750);
});
const observer_3 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 500
setInterval(() => {
observer.next('c: ' + i++);
}, 500);
});
zip(observer_1, observer_2, observer_3).pipe(take(5));
Operators — forkJoin
forkJoin также объединяет потоки, но он емитнит значение только когда все потоки будут завершены (complete).
const { forkJoin, Observable } = Rx;
const { take } = RxOperators;
const observer_1 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next('a: ' + i++);
}, 1000);
}).pipe(take(3));
const observer_2 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 750
setInterval(() => {
observer.next('b: ' + i++);
}, 750);
}).pipe(take(5));
const observer_3 = Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 500
setInterval(() => {
observer.next('c: ' + i++);
}, 500);
}).pipe(take(4));
forkJoin(observer_1, observer_2, observer_3);
Operators — map
Оператор трансформации map преобразует значение эмита в новое.
const { Observable } = Rx;
const { take, map } = RxOperators;
Observable.create((observer) => {
let i = 1;
// Испускаем значение раз в 1000мс
setInterval(() => {
observer.next(i++);
}, 1000);
}).pipe(
map(x => x * 10),
take(3)
);
Operators – share, tap
Оператор tap - позволяет делать side эффекты, то есть какие-либо действия, которые не влияют на последовательность.
Утилитный оператор share способен из холодного потока сделать горячим.
С операторами закончили. Перейдем к Subject.
И тут я пошел чаек пить. Утомили меня эти примеры :D
Семейство subject-ов
Семейство subject-ов являются ярким примером горячих потоков. Эти классы являются неким гибридом, которые выступают одновременно в роли observable и observer. Так как subject является горячим потоком, то от него необходимо отписываться. Если говорить по основным методам, то это:
- next – передача новых данных в поток
- error – ошибка и завершение потока
- complete – завершение потока
- subscribe – подписаться на поток
- unsubscribe – отписаться от потока
- asObservable – трансформируем в наблюдателя
- toPromise – трансформирует в промис
Выделяют 4 5 типов subject-ов.
На стриме говорил 4, а оказалось они еще один добавили. Как говорится век живи век учись.
Простой Subject new Subject()
– самый простой вид subject-ов. Создается без параметров. Передает значения пришедшие только после подписки.
BehaviorSubject new BehaviorSubject( defaultData<T> )
– на мой взгляд самый распространённый вид subject-ов. На вход принимает значение по умолчанию. Всегда сохраняет данные последнего эмита, которые передает при подписке. Данный класс имеет так же полезный метод value, который возвращает текущее значение потока.
ReplaySubject new ReplaySubject(bufferSize?: number, windowTime?: number)
— На вход опционально может принять первым аргументом размер буфера значений, которые он будет в себе хранить, а вторым время в течении которого нам нужны изменения.
AsyncSubject new AsyncSubject()
— при подписке ничего не происходит, и значение будет возвращено только при complete. Будет возвращено только последнее значение потока.
WebSocketSubject new WebSocketSubject(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>)
— О нем документация молчит и я сам его в первый раз вижу. Кто знает что он делает пишите, дополним.
Фуф. Ну вот мы и рассмотрели все, что я хотел сегодня рассказать. Надеюсь данная информация была полезной. Самостоятельно ознакомиться со списком литературы можно во вкладке полезная информация.
Полезная информация
- Ссылка на стрим
- Как работает JS: обзор движка, механизмов времени выполнения, стека вызовов
- Как работает JS: цикл событий, асинхронность и пять способов улучшения кода с помощью async / await
- Как работает Event Loop в JavaSript
- Эволюция асинхронного JavaScript
- Что такое RxJS и почему о нём полезно знать
- Практическое применение RxJS
- https://coursetro.com/posts/code/148/RxJS-Observables-Tutorial---Creating-&-Subscribing-to-Observables — RxJS Observables Tutorial — Creating & Subscribing to Observables
- RXJS: Hot and Cold Observables
- Классы, функции для создания Observable. Операторы.
- RxJS Operators By Example
- API List
- Разновидности Subject и Расписания в RxJS