Streams. Потоки в Node js
Streams — это концепция для передачи данных с одной программы в другую в операциях ввода/вывода.
Streams позволяют передавать данные небольшими порциями, с помощью чего мы можем работать с довольно большими объемами файлов. При работе с потоками мы не расходуем много памяти т.к. нам не нужно загружать весь файл в память, а только часть файла для буферизации.
Виды потоков
- Readable — поток для считывания
- Writable — поток для записи
- Duplex — это поток в котором можно и читать данные и записывать, где эти 2 процесса происходят независимо друг от друга
- Transform — разновидность Duplex потока, которые могут изменять данные при их записи/чтению
Поток имеет внутренний буфер (Back Buffer) для временного хранения данных, которые были получены, пока они не будут обработаны соответствующим способом (методом read() или write() в соответствии с видом потока).
Размер буфера можно указать через параметр highWaterMark, где смысл этого параметра устанавливается опцией objectMode.
new StreamObject({objectMode: false, highWaterMark: кол_во_байт}); //по умолчанию 16384 (16kb)
new StreamObject({objectMode: true, highWaterMark: кол_во_объектов});//по умолчанию 16
Readable stream
Events
- readable — когда стрим готов читать со своего внутреннего буфера
- error — при ошибке
- end — когда считывать больше нечего
Methods
- read() — считать часть даты с внутреннего буфера
- read(N) — считать кусок размером в N байт
В Readable stream данные буферизуются, пока не будет вызван метод read(). Как только общий размер внутреннего буфера достигнет порогового значения, указанного в highWaterMark, поток временно прекратит чтение данных.
const fs = require('fs');
const streamRead = new fs.ReadStream(_filename); // _filename - ссылка на файл для чтения
streamRead.on('readable', function() {
let data = streamRead.read();
if (data !== null) {
console.log(data); // фрагмент данных в виде буфера
if (data !== null) {
console.log(data.length); // длина прочитанного фрагмента файла
console.log(data.toString()); // преобразование данных с буфера в строку
}
}
});
streamRead.on('end', () => {
console.log('The end');
})
В этом примере мы поставили библиотеку fs для создания стрима и читаем данные из текстового файла. fs.ReadStream реализует стандартный поток чтения, который наследует от stream.Readable. Мы создали поток и он пытается прочитать данные с файла, когда он что-то прочитал — имитирует событие ‘readable’. Это событие оглашает, что данные прочитаны и находятся во внутреннем буфере. После, при помощи метода read() мы получаем данные из внутреннего буфера и уже можем его обработать.
Если data === null — значит данные на считывание закончились.
Writable stream
Writable stream тоже оснащен внутренним буфером. Сначала данные попадают во внутренний буфер, после записываются и внутренний буфер наполняется опять. Таким образом потерь при записи данных не происходит, т.к. процесс получения данных может происходить быстрее, чем процесс записи.
Events
- drain — когда внутренний буфер свободен для получения новых данных
- error — при ошибке
- finish — когда был вызван метод end() и все данные с внутреннего буфера были записаны
Methods
- write() — запись данных
- end() — закончить процесс
Создадим простой стрим для записи данных — 2 в степени 0, 2 в степени 1 и так до 10-й степени. Когда степень будет равна 10 — записываем полученное значение и закрываем стрим.
const fs = require('fs');
const streamWrite = new fs.WriteStream(_filename); // _filename - имя файла
for (let i = 0; i <= 10; i++) {
let data = `${2 ** i} | `;
streamWrite.write(data); // записываем данные в файл
if (i == 10) {
streamWrite.end();
}
}
streamWrite.on('finish', () => {
console.log('The end');
})
Здесь мы создаем стрим для записи через fs.WriteStream, производим вычисление данных data при помощи цикла и записываем данные методом write. Когда нам больше нечего записывать, вызываем метод end. Если данные с внутреннего буфера записались все — срабатывает событие finish и выводиться сообщение в консоль.
Ничего сложного. Пойдем далее. Мы уже умеем писать стримы для записи и считывания так давайте сделаем микс — читаем данные, ищем нужные значения, если подходят — записываем.
const fs = require('fs');
const _filenameWrite = './demo/wr.txt';
const _filenameRead = './demo/demo.txt';
// создаем 2 стрима - на запись и считывание
const streamRead = new fs.ReadStream(_filenameRead);
const streamWrite = new fs.WriteStream(_filenameWrite);
streamRead.on('readable', function() {
if (data !== null) {
// ищем есть ли в полученном блоке нужные строки, если находим - записываем в новый файл
if (data.includes('Понтий Пилат')) {
streamWrite.write(data); // записываем данные в файл
}
}
});
streamRead.on('end', () => {
console.log('The end read');
streamWrite.end();
})
streamWrite.on('finish', () => {
console.log('The end write');
})
Pipe
Pipe — это канал, который связывает поток для чтения и поток для записи. Он позволяет сразу передавать данные с чтения на запись и контролирует, чтоб к моменту передачи новых данных для записи, предыдущие были уже записаны.
const fs = require('fs');
const _filenameWrite = './demo/wr.txt';
const _filenameRead = './demo/demo.txt';
// создаем 2 стрима - на запись и считывание
const streamRead = new fs.createReadStream(_filenameRead);
const streamWrite = new fs.createWriteStream(_filenameWrite);
streamRead.pipe(streamWrite);
У потока чтения вызывается метод pipe(), в который передается поток для записи.