Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Привет, друзья!
Представляю вашему вниманию перевод второй части этой замечательной статьи.
Ссылка на первую часть.
Веб-потоки (web streams) — это стандарт для потоков (streams), который поддерживается всеми основными веб-платформами: веб-браузерами, Node.js и Deno. Потоки — это абстракция для чтения и записи данных последовательно, небольшими частями из любого вида источника — файлов, данных, находящихся на сервере, и т.д.
Например, глобальная функция fetch (которая используется для загрузки онлайн-ресурсов) асинхронно возвращает ответ (Response
), содержащий свойство body
с веб-потоком.
В данной статье рассматриваются веб-потоки в Node.js
, но то, о чем мы будем говорить, применимо к любой поддерживающей их платформе.
Содержание
5. Превращение приемников данных в WS
с помощью упаковки
5.1. Пример: трассировка RS
- 5.2. Пример: формирование строки из записываемых чанков
6. Использование TS
6.1. Стандартные TS
6.1.1. Пример: декодирование потока байтов в кодировке UTF-8
- 6.1.2. Пример: создание доступного для чтения строкового потока для стандартного входа
7. Реализация кастомных TS
- 7.1. Пример: преобразование потока произвольных чанков в поток строк
- 7.2. Асинхронные генераторы также отлично подходят для преобразования потока
- 8. Детали противодавления
- 8.1. Сигнализация противодавления
- 8.2. Обработка противодавления
8.2.1. Код, записывающий данные в WS
черезWriter
8.2.2. Базовый источник RS
8.2.3. Базовый приемник WS
8.2.4. TS
(writable-readable)8.2.5. pipeTo()
(RS -> WS)
- 9. Потоки байтов
- 9.1. Доступный для чтения поток байтов
- 9.2. Пример: бесконечный доступный для чтения поток байтов, заполняемый произвольными данными
- 9.3. Пример: сжатие доступного для чтения потока байтов
9.4. Пример: чтение веб-страницы с помощью fetch()
10. Специфичные для Node.js
вспомогательные функции
5. Превращение приемников данных в WS
с помощью упаковки
Для того, чтобы иметь возможность записывать данные во внешний приемник через WS
, этот источник необходимо обернуть в объект-адаптер, который называется базовым приемником (underlying sink):
new WritableStream(underlyingSink?, queuingStrategy?);
Базовые приемники имеют следующую сигнатуру:
interface UnderlyingSink<TChunk> {
start?(
controller: WritableStreamDefaultController
): void | Promise<void>;
write?(
chunk: TChunk,
controller: WritableStreamDefaultController
): void | Promise<void>;
close?(): void | Promise<void>;
abort?(reason?: any): void | Promise<void>;
}
Свойства:
start(controller)
: вызывается сразу после вызова конструктораWS
. Для асинхронных операций можно возвращать промис. Данный метод позволяет подготовиться к записи;write(chunk, controller)
: вызывается при готовности нового чанка к записи в приемник. Здесь можно отслеживать противодавление, возвращая промис, который разрешается при деактивации противодавления;close()
: вызывается после вызоваwriteStream.close()
и записи всех чанков из очереди. Данный метод позволяет выполнять очистку после записи;abort(reason)
: вызывается в случае вызоваwriteStream.abort()
илиwriter.abort()
.reason
— это значение, переданное этим методам.
Параметр controller
методов start
и write
позволяет переводить WS
в состояние ошибки. Он имеет следующую сигнатуру:
interface WritableStreamDefaultController {
readonly signal: AbortSignal;
error(err?: any): void;
}
signal
:AbortSignal
, позволяющий прерывать запись или закрывать операцию при закрытии потока;error(err)
: закрываетWS
, последующие взаимодействия с ним буду проваливаться со значениемerr
.
5.1. Пример: трассировка RS
В следующем примере RS
подключается к WS
для отслеживания того, как RS
производит чанки:
const readableStream = new RS({
start(controller) {
controller.enqueue("Первый чанк");
controller.enqueue("Второй чанк");
controller.close();
},
});
await readableStream.pipeTo(
new WritableStream({
write(chunk) {
console.log("ЗАПИСЬ " + JSON.stringify(chunk));
},
close() {
console.log("ЗАКРЫТИЕ");
},
abort(err) {
console.log("ПРЕРЫВАНИЕ " + err);
},
})
);
/**
* ЗАПИСЬ "Первый чанк"
* ЗАПИСЬ "Второй чанк"
* ЗАКРЫТИЕ
*/
5.2. Пример: формирование строки из записываемых чанков
В следующем примере мы создаем подкласс WS
, который собирает все записываемые чанки в строку. Метод getString
обеспечивает доступ к строке:
class WritableStringStream extends WS {
#string = "";
constructor() {
super({
// Нам нужен доступ к `this` `WritableStringStream`,
// поэтому мы используем стрелочную функцию
write: (chunk) => {
this.#string += chunk;
},
});
}
getString() {
return this.#string;
}
}
const stringStream = new WritableStringStream();
const writer = stringStream.getWriter();
try {
await writer.write("Как");
await writer.write(" твои ");
await writer.write(" дела?");
await writer.close();
} finally {
writer.releaseLock()
}
assert.equal(
stringStream.getString(),
"Как твои дела?"
);
Недостатком данного подхода является смешение API
: WS API
и API
нашего строкового потока. Альтернативой является делегирование ответственности WS
вместо его расширения:
function createWritableStringStream() {
let string = "";
return {
stream: new WS({
write(chunk) {
string += chunk;
},
}),
getString() {
return string;
},
};
}
const stringStream = createWritableStringStream();
const writer = stringStream.stream.getWriter();
try {
await writer.write("Как");
await writer.write(" твои ");
await writer.write(" дела?");
await writer.close();
} finally {
writer.releaseLock()
}
assert.equal(
stringStream.getString(),
"Как твои дела?"
);
Эта функциональность также может быть реализована с помощью класса (вместо фабричной функции для объектов).
6. Использование TS
TS
:
- получает входные данные через сторону для записи (writable side),
WS
; - может преобразовывать входные данные;
- позволяет читать результат через сторону для чтения (readable side),
RS
.
Основным способом применения TS
является пропускание через них данных (pipe through) для преобразования:
const transformedStream = readableStream.pipeThrough(transformStream);
pipeThrough()
подключает RS
к стороне для записи TS
и возвращает его сторону для чтения. Другими словами, создается новый RS
, который является преобразованной версией RS
.
pipeThrough()
принимает не только TS
, но также любой объект, соответствующий такому контракту:
interface ReadableWritablePair<RChunk, WChunk> {
readable: RS<RChunk>;
writable: WS<WChunk>;
}
6.1. Стандартные TS
Node.js
поддерживает следующие стандартные TS
:
- Кодирование (стандарт WHATWG) — TextEncoderStream и TextDecoderStream:
- данные потоки поддерживают не только
UTF-8
, но и многие устаревшие кодировки; - единичная кодовая точка (code point) Юникода кодируется в единицы кода
UFT-8
, размером до 4 байт. В байтовый потоках единицы кода могут разделяться на чанки.TextDecoderStream
обеспечивает их правильную обработку; - доступны на большинстве
JS-платформ
;
- данные потоки поддерживают не только
- Потоки для сжатия (групповой черновик сообщества W3C) — CompressionStream и DecompressionStream:
- поддерживаемые форматы сжатия:
deflate
,deflate-raw
,gzip
; - доступны на многих
JS-платформах
.
- поддерживаемые форматы сжатия:
6.1.1. Пример: декодирование потока байтов в кодировке UTF-8
const response = await fetch("https://example.com");
const readableByteStream = response.body;
const readableStream = readableByteStream
.pipeThrough(new TextDecoderStream("utf-8"));
for await (const stringChunk of readableStream) {
console.log(stringChunk);
}
response.body
— это ReadableByteStream
(доступный для чтения поток байтов), чанки которого являются экземплярами Uint8Array
. Мы пропускаем этот поток через TextDecoderStream
для получения потока, содержащего строковые чанки.
Обратите внимание: декодирование каждого байтового чанка по отдельности (например, с помощью TextDecoder
) работать не будет, поскольку байты единицы кода могут находиться в разных чанках.
6.1.2. Пример: создание доступного для чтения строкового потока для стандартного входа
Следующий модуль Node.js
выводит в терминал все, что получает через стандартный вход:
import { Readable } from "node:stream";
const webStream = Readable.toWeb(process.stdin)
.pipeThrough(new TextDecoderStream("utf-8"));
for await (const chunk of webStream) {
console.log(">>>", chunk);
}
Доступ к стандартному входу можно получить через поток, хранящийся в process.stdin
. Без указания кодировки для потока и его преобразования с помощью Readable.toWeb()
создается байтовый поток. Он пропускается через TextDecoderStream
для получения текстового потока.
Обратите внимание: мы обрабатываем стандартный вход инкрементально — как только очередной чанк становится доступным, он выводится в терминал. Другими словами, нам не нужно ждать завершения ввода. Это может быть полезным при большом количестве данных или в случае, когда данные приходят с перерывами.
7. Реализация кастомных TS
Кастомный TS
можно реализовать путем передачи объекта Transformer
в конструктор TS
. Данный объект имеет следующую сигнатуру:
interface Transformer<TInChunk, TOutChunk> {
start?(
controller: TransformStreamDefaultController<TOutChunk>
): void | Promise<void>;
transform?(
chunk: TInChunk,
controller: TransformStreamDefaultController<TOutChunk>
): void | Promise<void>;
flush?(
controller: TransformStreamDefaultController<TOutChunk>
): void | Promise<void>;
}
Свойства:
start(controller)
: вызывается сразу после вызова конструктораTS
. Данный метод позволяет подготовиться к преобразованиям;transform(chunk, controller)
: выполняет преобразования. Получает входной чанк и может использовать его параметрcontroller
для помещения в очередь одного или нескольких преобразованных выходных чанков. Также может ничего не помещать в очередь;flush(controller)
: вызывается после успешного преобразования всех чанков. Данный метод позволяет выполнять очистку.
Каждый из этих методов может возвращать промис, до разрешения которого выполнение кода приостанавливается. Это может быть полезным при выполнении асинхронных операций.
Параметр controller
имеет следующую сигнатуру:
interface TransformStreamDefaultController<TOutChunk> {
enqueue(chunk?: TOutChunk): void;
readonly desiredSize: number | null;
terminate(): void;
error(err?: any): void;
}
Свойства:
enqueue(chunk)
: помещает чанк в сторону для чтения (вывод)TS
;desiredSize
: возвращает желаемый размер внутренней очереди стороны для чтения (вывода)TS
;terminate()
: закрывает сторону для чтения (вывод) и переводит сторону для записи (вход)TS
в состояние ошибки. Может использоваться в случае, когда преобразователь не заинтересован в оставшихся чанках стороны для записи (входа) и хочет их пропустить;error(err)
: переводитTS
в состояние ошибки, последующие взаимодействия с ним будут проваливаться со значениемerr
.
Что насчет противодавления? Противодавление передается от стороны для чтения (выхода) к стороне для записи (вход). Обычно, преобразования не сильно влияют на количество данных. Поэтому по умолчанию преобразователи игнорируют противодавление. Тем не менее, оно может быть обнаружено через transformStreamDefaultController.desiredSize
и передано посредством возврата промиса из transformer.transform()
.
7.1. Пример: преобразование потока произвольных чанков в поток строк
Следующий подкласс TS
преобразует поток произвольных чанков в поток, где каждый чанк занимает ровно одну строку. Каждый чанк, за исключением последнего, заканчивается символом \n
в Unix
и символами \r\n
в Windows
:
class ChunksToLinesTransformer {
#previous = "";
transform(chunk, controller) {
let startSearch = this.#previous.length;
this.#previous += chunk;
while (true) {
// Работает для EOL === "\n" и EOL === "\r\n"
const eolIndex = this.#previous.indexOf("\n", startSearch);
if (eolIndex < 0) break;
// Строка включает EOL
const line = this.#previous.slice(0, eolIndex + 1);
controller.enqueue(line);
this.#previous = this.#previous.slice(eolIndex + 1);
startSearch = 0;
}
}
flush(controller) {
// Выполняем очистку и помещаем в очередь оставшийся текст
if (this.#previous.length > 0) {
controller.enqueue(this.#previous);
}
}
}
class ChunksToLinesStream extends TS {
constructor() {
super(new ChunksToLinesTransformer());
}
}
const stream = new RS({
async start(controller) {
controller.enqueue("несколько\nстрок\nтекста");
controller.close();
},
});
const transformStream = new ChunksToLinesStream();
const transformed = stream.pipeThrough(transformStream);
for await (const line of transformed) {
console.log(">>>", JSON.stringify(line));
}
/**
* >>> "несколько\n"
* >>> "строк\n"
* >>> "текста"
*/
Обратите внимание: Deno
имеет встроенный TextLineStream со схожим функционалом.
7.2. Асинхронные генераторы также отлично подходят для преобразования потока
Поскольку RS
являются асинхронно перебираемыми, мы можем использовать асинхронные генераторы для их преобразования. Это делает код очень элегантным:
const stream = new RS({
async start(controller) {
controller.enqueue("раз");
controller.enqueue("два");
controller.enqueue("три");
controller.close();
},
});
async function* prefixChunks(prefix, asyncIterable) {
for await (const chunk of asyncIterable) {
yield "> " + chunk;
}
}
const transformedAsyncIterable = prefixChunks("> ", stream);
for await (const transformedChunk of transformedAsyncIterable) {
console.log(transformedChunk);
}
/**
* > раз
* > два
* > три
*/
8. Детали противодавления
Рассмотрим такой конвейер:
rs.pipeThrough(ts).pipeTo(ws);
Подключения, создаваемые этим выражением (pipeThrough()
использует pipeTo()
для подключения rs
к стороне для записи ts
):
rs -pipeTo-> ts{writable,readable} -pipeTo-> ws
Наблюдения:
- базовый источник
rs
может быть представлен как элемент цепочки, предшествующийrs
; - базовый приемник
ws
может быть представлен как элемент цепочки, следующий заws
; - каждый поток имеет внутренний буфер: буферы
RS
следуют за базовыми источниками, буферыWS
находятся перед базовыми приемниками.
Предположим, что базовый приемник ws
является медленным, что привело к заполнению буфера ws
. В этом случае происходит следующее:
ws
сигнализирует о заполненности;pipeTo
прекращает чтение данных изts.readable
;ts.readable
сигнализирует о заполненности;ts
прекращает передачу чанков изts.writable
вts.readable
;ts.writable
сигнализирует о заполненности;pipeTo
прекращает читать данные изrs
;rs
сообщает о заполненности базовому источнику;- базовый источник приостанавливается.
Этот пример показывает, что нам требуется 2 вида функционала:
- сущности, получающие данные, должны иметь возможность посылать сигнал противодавления;
- сущности, отправляющие данные, должны реагировать на сигналы противодавления.
Посмотрим, как данный функционал реализуется в веб-потоках.
8.1. Сигнализация противодавления
Сигнал о противодавлении посылается сущностями, которые получают данные. Веб-потоки имеют 2 таких сущности:
WS
получает данные через методwrite
объектаWriter
;RS
получает данные при вызове методаenqueue
объектаReadableStreamDefaultController
базового источника.
В обоих случаях входные данные буферизуются с помощью очередей. Сигнал о противодавлении возникает при заполнении очереди. Как его можно обнаружить?
Вот где находятся очереди:
- очередь
WS
хранится внутриWritableStreamDefaultController
; - очередь
RS
хранится внутриReadableStreamDefaultController
.
Желаемый размер (desired size) — это число, означающее количество свободного места (пространства, комнаты) (room) в очереди:
- если свободное место имеется, число положительное;
- если места нет, число равно
0
; - если очередь переполнена, число отрицательное.
Поэтому противодавление применяется, когда желаемый размер <= 0
. Размер доступен через геттер desiredSize
объекта, содержащего очередь.
Как вычисляется желаемый размер? Через объект, определяющий так называемую стратегию помещения в очередь (queuing strategy). RS
и WS
имеют дефолтные стратегии помещения в очередь, которые могут быть перезаписаны через опциональный параметр их конструкторов. Интерфейс QueuingStrategy содержит 2 свойства:
- метод
size(chunk)
возвращает размерchunk
;
- текущий размер очереди — это сумма размеров содержащихся в ней чанков;
- свойство
highWaterMark
(верхняя отметка) определяет максимальный размер очереди.
Желаемый размер очереди — это верхняя отметка минус текущий размер очереди:
desiredSize = highWaterMark - [chunks].reduce((x, y) => x + y, 0);
8.2. Обработка противодавления
Сущности, отправляющие данные, должны реагировать на противодавление.
8.2.1. Код, записывающий данные в WS
через Writer
- Мы можем ждать разрешения промиса в
writer.ready
. В это время мы заблокированы, и достигается противодавление. Промис разрешается при появлении свободного места в очереди. Разрешение запускается, когда значениеwriter.desiredSize
становится пложительным; - в качестве альтернативы, можно ждать разрешения промиса, возвращаемого
writer.write()
. В это время очередь не заполняется.
Также имеется возможность указывать размер чанков в writer.desiredSize
.
8.2.2. Базовый источник RS
Объект базового источника, который передается RS
, оборачивает внешний источник. Он является звеном цепочки, располагающимся перед RS
.
- Базовые pull-источники запрашивают новые данные при появлении комнаты в очереди. При отсутствии комнаты противодавление выполняется автоматически, поскольку данные не запрашиваются;
- базовые push-источники должны проверять
controller.desiredSize
после помещения чего-либо в очередь: если желаемый размер<=0
, они должны выполнять противодавление путем приостановки их внешних источников.
8.2.3. Базовый приемник WS
Объект базового приемника, передаваемый в WS
, оборачивает внешний приемник. Он является звеном цепочки, располагающимся после WS
.
Каждый внешний источник сообщает о противодавлении по-разному (в некоторых случаях этого вообще не происходит). Базовый источник может оказывать противодавление путем возврата промиса из метода write
, который разрешается после завершения записи. В стандарте веб-потоков имеется пример того, как это работает.
8.2.4. TS
(writable-readable)
TS
подключает сторону для записи к стороне для чтения путем реализации базового приемника для первой и базового источника для второй. Он имеет внутренний слот [[backpressure]]
, который является индикатором активности противодавления.
- Метод
write
базового приемника стороны для записи асинхронно ожидает завершения внутреннего противодавления перед передачей очередного чанка преобразователюTS
(TransformStreamDefaultSinkWriteAlgorithm). Преобразователь может использовать очередь черезTransformStreamDefaultController
. Обратите внимание:write()
возвращает промис, который разрешается после выполнения метода. До разрешения этого промисаWS
буферизует входящие запросы на запись с помощью очереди. Поэтому противодавление стороны для записи реализуется через очередь и ее желаемый размер; - противодавление
TS
активируется, если чанк помещается в очередь с помощьюTransformStreamDefaultController
и очередь стороны для чтения становится полной ((TransformStreamDefaultControllerEnqueue)[https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue]); - противодавление
TS
может быть деактивировано, если что-либо читается изReader
((ReadableStreamDefaultReaderRead)[https://streams.spec.whatwg.org/#readable-stream-default-reader-read]);
- если в очереди имеются комнаты, возможно, пришло время вызвать
pull()
(([[PullSteps]]
)[https://streams.spec.whatwg.org/#rs-default-controller-private-pull]); pull()
базового источника стороны для чтения деактивирует противодавление ((TransformStreamDefaultSourcePullAlgorithm)[https://streams.spec.whatwg.org/#transform-stream-default-source-pull]).
- если в очереди имеются комнаты, возможно, пришло время вызвать
8.2.5. pipeTo()
(RS -> WS)
pipeTo()
читает чанки из RS
через Reader
и записывает их в WS
через Writer
. Он приостанавливается, когда writer.desiredSize <= 0
(шаг 15 (ReadableStreamPipeTo)[https://streams.spec.whatwg.org/#readable-stream-pipe-to]).
9. Потоки байтов
До сих пор мы говорили о текстовых потоках, потоках, чанки которых представляют собой строки. Но Web streams API
также поддерживает потоки байтов для двоичных данных, где чанки являются Uint8Arrays
(типизированными массивами):
RS
имеют специальный режим (mode)bytes
;- для
WS
неважно, чем являются чанки, строками илиUint8Arrays
. Следовательно, каким потоком является экземплярWS
зависит от того, что способен обрабатывать базовый приемник; - какой вид чанков способен обрабатывать
TS
также зависит от егоTransformer
.
Посмотрим, как создать доступный для чтения поток байтов.
9.1. Доступный для чтения поток байтов
Тип создаваемого конструктором RS
потока зависит от опционального свойства type
его опционального первого параметра underlyingSource
:
- если
type
не указан илиunderlyingSource
не передан, создается текстовый поток; - если
type
имеет значение"bytes"
(строка), новый экземпляр будет представлять собой поток байтов:
const readableByteStream = new RS({
type: "bytes",
async start() {}
// ...
});
На что влияет режим bytes
?
В дефолтном режиме базовый источник может возвращать любой вид чанков. В "байтовом" режиме чанки должны быть ArrayBufferViews
, т.е. TypedArrays
(такими как Uint8Arrays
) или DataViews
.
Доступный для чтения поток байтов может создавать 2 вида Reader
:
getReader()
возвращает экземплярReadableStreamDefaultReader
;getReader({ mode: "byob" })
возвращает экземплярReadableStreamBYOBReader
.
BYOB
расшифровывается как Bring Your Own Buffer
(предоставьте собственный буфер) и означает, что мы может передать буфер (ArrayBufferView
) в reader.read()
. После этого данный ArrayBufferView
отключается и больше не используется. Однако read()
возвращает данные в виде нового ArrayBufferView
, который имеет тот же тип и обращается к той же области того же самого ArrayBuffer
.
Кроме того, доступные для чтения потоки байтов имеют разные контроллеры: они являются экземплярами ReadableByteStreamController
(а не ReadableStreamDefaultController
). Помимо принуждения (forcing) базовых источников помещать в очередь ArrayBufferViews
(TypedArrays
или DataViews
), они поддерживают ReadableStreamBYOBReaders
через свойство byobRequest. В стандарте веб-потоков приведено 2 примера использования byobRequest
.
9.2. Пример: бесконечный доступный для чтения поток байтов, заполняемый произвольными данными
В следующем примере создается бесконечный доступный для чтения поток байтов, заполняющий чанки произвольными данными:
import { promisify } from "node:util";
import { randomFill } from "node:crypto";
const asyncRandomFill = promisify(randomFill);
const readableByteStream = new RS({
type: "bytes",
async pull(controller) {
const byobRequest = controller.byobRequest;
await asyncRandomFill(byobRequest.view);
byobRequest.respond(byobRequest.view.byteLength);
},
});
const reader = readableByteStream.getReader({ mode: "byob" });
const buffer = new Uint8Array(10); // (1)
const firstChunk = await reader.read(buffer); // (2)
console.log(firstChunk);
Поскольку readableByteStream
является бесконечным, мы не можем перебрать его в цикле. Поэтому читается только его первый чанк (2).
Созданный буфер (1) передается в reader.read()
(2) и после этого становится недоступным для чтения.
9.3. Пример: сжатие доступного для чтения потока байтов
В следующем примере создается доступный для чтения поток байтов, который пропускается через поток, сжимающий данные в формат GZIP
:
const readableByteStream = new RS({
type: "bytes",
start(controller) {
// 256 нулей
controller.enqueue(new Uint8Array(256));
controller.close();
},
});
const transformedStream = readableByteStream.pipeThrough(
new CompressionStream("gzip"));
await logChunks(transformedStream);
async function logChunks(readableByteStream) {
const reader = transformedStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value);
}
} finally {
reader.releaseLock();
}
}
9.4. Пример: чтение веб-страницы с помощью fetch()
Результат fetch()
разрешается объектом ответа, свойство body
которого представляет собой доступный для чтения поток байтов. Данный поток преобразуется в текстовый поток с помощью TextDecoderStream
:
const response = await fetch("https://example.com");
const readableByteStream = response.body;
const readableStream = readableByteStream.pipeThrough(
new TextDecoderStream("utf-8"));
for await (const stringChunk of readableStream) {
console.log(stringChunk);
}
10. Специфичные для Node.js
вспомогательные функции
Node.js
— единственная на сегодняшний день платформа, поддерживающая следующие вспомогательные функции, именуемые utility consumers (утилитами потребления?):
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from "node:stream/consumers";
Эти функции преобразуют веб RS
, Readables
Node.js
и AsyncIterators
в промисы, которые разрешаются:
ArrayBuffers
—arrayBuffer()
;Blobs
—blob()
;- буферами
Node.js
—buffer()
; - объектами
JSON
—json()
; - строками —
text()
.
Предполагается, что бинарные данные имеют кодировку UTF-8
:
import * as streamConsumers from "node:stream/consumers";
const readableByteStream = new RS({
type: "bytes",
start(controller) {
// TextEncoder преобразует строки в Uint8Arrays в кодировке UTF-8
const encoder = new TextEncoder();
const view = encoder.encode(`"