Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
TL;DR: Порядок важен. Операторы довольно атомарны и зачастую очень просты, но это не мешает им объединяться в сложные последовательности, в которых легко допустить ошибку. Давайте разберемся.
Будет очень много marble диаграмм, извините. Пара ресурсов для тех, кто не в курсе про marble диаграммы: How to Read an RxJS Marble Diagram и Testing RxJS Code with Marble Diagrams. В некоторых очень спорных местах я добавил подсказки: ↑
подскажет, когда конкретно произошло событие; ⇈
означает, что произошло два синхронных события подряд.
Простая последовательность
Начнем с обычного интервала.
interval(1000).subscribe(console.log);
Его диаграмма будет такой:
// interval(1000)
-----0-----1-----2-----3-----4--->
Интервал выводит числа по возрастанию, давайте прокачаем его и добавим немного случайности с помощью Math.random()
:
interval(1000)
.pipe(map(() => Math.random()))
.subscribe(console.log);
Для упрощения диаграммы, будем считать, что Math.random()
возвращает целые числа.
Обновим диаграмму:
// interval(1000)
-----0-----1-----2-----3-----4--->
// map(map(() => Math.random())
-----1-----6-----3-----2-----9--->
Ждать первое значение целую секунду — это долго. Хочется сразу после подписки видеть значение в консоли. Есть прекрасный оператор startWith
. Добавим его в нашу цепочку операторов:
interval(1000)
.pipe(
map(() => Math.random()),
startWith(-1),
)
.subscribe(console.log);
Можете предположить результат?
Помните, я говорил, что порядок важен. Это как раз хороший пример для данного утверждения. Давайте посмотрим на диаграмму:
// interval(1000)
-----0-----1-----2-----3-----4--->
// map(() => Math.random())
-----1-----6-----3-----2-----9--->
// startWith(-1)
(-1)-1-----6-----3-----2-----9--->
↑ ↑
Упс. Значение добавляется после нашей функции случайности. Название немного сбивает, и, мне кажется, что используя startWith
в первый раз, я поставил этот оператор не туда.
Как это работает: под капотом оператор создаёт новый стрим, который передается в следующий оператор. Поэтому получается, что startWith
принимает стрим, который пришел из map
, и уже в этот стрим записывается первое значение.
Окей, теперь зная это, поправим код так, чтобы все цифры проходили через map
, в том числе и результат startWith
.
interval(1000)
.pipe(
startWith(-1),
map(() => Math.random()),
)
.subscribe(console.log);
Получим такую диаграмму:
// interval(1000)
-----0-----1-----2-----3-----4--->
// startWith(-1)
(-1)-0-----1-----2-----3-----4--->
↑ ↑
// map(() => Math.random())
2----1-----6-----3-----2-----9--->
Перфекто.
У меня есть к вам вопрос: что будет в консоли (или как будет выглядеть диаграмма) при такой последовательности операторов?
interval(1000)
.pipe(
startWith(-1),
map(() => Math.random()),
startWith('a'),
)
.subscribe(console.log);
Операторы же выполняются по порядку. Ведь так? Ведь так!?
Да, все так. Только надо внимательно следить за происходящим и помнить: каждый оператор создает новый поток. Разберем по шагам:
Создаем поток из
interval(1000)
;К этому потоку
startWith
добавляет в самое начало-1
;Выполняем
Math.random()
;К потоку из предыдущего шага следующий
startWith
в самое начало добавляет'a'
;Сразу после подписки мы увидим
'a'
, следом за ним будет результатMath.random()
, который выполнился из-за-1
. Все это будет происходить синхронно.Остальные значения будут выводиться асинхронно в консоли раз в секунду.
Диаграмма все упростит (надеюсь):
// interval(1000)
-----0-----1-----2-----3-----4--->
// startWith(-1)
(-1)-0-----1-----2-----3-----4--->
↑ ↑
// map(() => Math.random())
2----1-----6-----3-----2-----9--->
// startWith('a')
a2---1-----6-----3-----2-----9--->
⇈
Порядок операторов важен. Но иногда бывает не так очевидно, что придет в subscribe
, в какой последовательности и почему.
shareRelay
Есть операторы (или группы операторов), которые требуют к себе особого внимания: их неправильное использование может привести к утечкам памяти.
Начнем с группы операторов шаринга состояния. Все примеры будут с оператором shareRelay
, но это применимо и к другим операторам и их комбинациям из «sharing группы».
Мой общий совет звучит так: shareRelay
должен быть последним оператором в .pipe()
. Если вы располагаете его в другом месте, либо вы знаете, зачем он там нужен, либо совершаете ошибку. Разберемся почему.
Взглянем на shareRelay
, точнее, на его отсутствие:
const randomTimer = interval(1000).pipe(map(() => Math.random()));
randomTimer.subscribe(console.log);
randomTimer.subscribe(console.log);
Каждый subscribe
создает свой поток интервалов, который потом преобразуется с помощью Math.random()
. Получается, что каждую секунду мы будем видеть 2 цифры в консоли, но они будут разные.
// subscribe
-----3-----7-----1-----8-----9--->
// subscribe
-----5-----2-----1-----7-----0--->
Если мы сделаем подписку на один из потоков асинхронной:
const randomTimer = interval(1000).pipe(map(() => Math.random()));
randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 500);
то каждая подписка будет писать в консоль число независимо от предыдущей. Будет такая диаграмма результата:
// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
----5-----2-----1-----7-----0->
Обратите внимание, что вторая подписка начинается не сразу.
Если мы хотим во всех подписках использовать один и тот же интервал, а не создавать его каждый раз заново, то без shareRelay
не обойтись:
const randomTimer = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 }),
map(() => Math.random()),
);
randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 500);
Угадаете, какой будет диаграмма?
// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
--5-----2-----1-----7-----0--->
Да, мы подписываемся с задержкой в 500мс. Но так как мы шарим интервал, задержка не важна: значения будут отображаться в консоли одновременно. Давайте поменяем интервал на 1500мс:
const randomTimer = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 }),
map(() => Math.random()),
);
randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 1500);
Диаграмма:
// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
5--2-----1-----7-----0--->
shareReplay
работает таким образом, что запоминает последнее значение, и, если оно было, мы получаем его мгновенно, без задержек. А все последующие значения будут отображаться во время срабатывания таймера.
Идем дальше. А что если нам надо шарить результат и Math.random()
в том числе? Надо поместить shareReplay
чуть-чуть подальше:
const randomTimer = interval(1000).pipe(
map(() => Math.random()),
shareReplay({ refCount: true, bufferSize: 1 }),
);
randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 1500);
Думаю, что диаграмма окажется вполне очевидной:
// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
3--7-----1-----8-----9--->
В наших примерах, когда shareRelay
находился перед map
, это был баг, а не фича. Как я упоминал ранее, операторы, подобные shareRelay
, в большинстве случаев надо использовать в конце пайпа. Я обычно добавляю этот оператор в конце каждого пайпа.
takeUntil и автоматическая отписка
* Сейчас я буду говорить про один из классических для Angular подходов автоматической отписки. Если вы используете RxJs в хуках React'а и/или используете другой подход автоматической отписки, то это правило именно в такой формулировке к вам не применимо. Об использовании takeUntil
по другому поговорим чуть позже. На этом подходе можно хорошо показать важность правильного расположения операторов друг между другом.
Перейдем ко второй группе: операторы завершения потока. Примеры будут с takeUntil
, но это применимо также и к другим операторам аналогичной группы.
К takeUntil
в рамках Angular я тоже применяю одно общее правило использования: takeUntil
должен быть последним оператором перед subscribe
и должен быть всегда.
Почему так? Разберем на примере одного из типовых применений takeUntil
в Angular-компонентах, но с неправильным порядком операторов.
class MyComponent {
private destroy$ = new ReplaySubject(1);
ngOnInit() {
interval(1000)
.pipe(
takeUntil(this.destroy$),
switchMap(() => this.apiService.ping()),
)
.subscribe();
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
Что тут произойдет? Если в момент того, как срабатывает destroy$
, запрос находится ещё в процессе, то мы его не отменим. И, кстати, мы не можем гарантировать, что поток возвращаемый методом ping()
, когда-нибудь завершится. А это уже выглядит как утечка памяти.
Надо сделать правильный порядок вещей:
ngOnInit() {
interval(1000)
.pipe(
switchMap(() => this.apiService.ping()),
takeUntil(this.destroy$),
)
.subscribe();
}
Теперь никаких утечек