Streams. Потоки в Node js

Streams. Потоки в Node js

Node js 30.03.2018

Streams — это концепция для передачи данных с одной программы в другую в операциях ввода/вывода.

Streams позволяют передавать данные небольшими порциями, с помощью чего мы можем работать с довольно большими объемами файлов. При работе с потоками мы не расходуем много памяти т.к. нам не нужно загружать весь файл в память, а только часть файла для буферизации.

Виды потоков

  • Readable — поток для считывания
  • Writable — поток для записи
  • Duplex — это поток в котором можно и читать данные и записывать, где эти 2 процесcа происходят независимо друг от друга
  • Transform — разновидность Duplex потока, которые могут изменять данные при их записи/чтению

Поток имеет внутренний буфер (Back Buffer) для временного хранения данных, которые были полдучены, пока они не будут обработаны соответствующим способом (методом read() или write() в соответствии с видом потока).

Размер буфера можно цказать через параметр highWaterMark, где смысл этого параметра устанавливается опцией objectMode.

new StreamObject({objectMode: false, highWaterMark: кол_во_байт}); //по умолчанию 16384 (16kb)
new StreamObject({objectMode: true, highWaterMark: кол_во_объектов});//по умолчанию  16

Readable stream

Events

  • readable — когда стрим готов читать со своего внутреннего буфера
  • error — при ошибке
  • end — когда считывать больше нечего

Methods

  • read() — считать часть даты с внутреннего буфера
  • read(N) — считать кусок размером в N байт

В Readable stream данные буферизуются, пока не будет вызван метод  read(). Как только общий размер внутреннего буфера достигнет порогового значения, указанного в highWaterMark, поток временно прекратит чтение данных.

const fs = require('fs');

const streamRead = new fs.ReadStream(_filename); // _filename - ссылка на файл для чтения

streamRead.on('readable', function() {
  let data = streamRead.read();
  if (data !== null) {
    console.log(data); // фрагмент данных в виде буфера
    if (data !== null) {
        console.log(data.length); // длина прочитанного фрагмента файла
        console.log(data.toString()); // преобразование данных с буфера в строку
    }
  }
});

streamRead.on('end', () => {
  console.log('The end');
})

В этом примере мы поставили библиотеку fs для создания стрима и читаем данные из текстового файла. fs.ReadStream реализует стандартный поток чтения, который наследует от stream.Readable. Мы создали поток и он пытается прочитать данные с файла, когда он что-то прочитал — имитирует событие ‘readable’. Это событие оглашает, что данные прочитаны и находятся во внутреннем буфере. После, при помощи метода read() мы получаем данные из внутреннего буфера и уже можем его обработать.

Если data === null — значит данные на считывание закончились.

Writable stream

Writable stream тоже оснащен внутренним буфером. Сначала данные попадают во внутренний буфер, после записываются и внутренний буфер наполняется опять. Таким образом потерь при записи данных не происходит, т.к. процесс получения данных может происходить быстрее, чем процесс записи.

Events

  • drain — когда внутренний буфер свободен для получения новых данных
  • error — при ошибке
  • finish — когда был вызван метод  end() и все данные с внутреннего буфера были записаны

Methods

  • write() — запись данных
  • end() — закончить процесс

Создадим простой стрим для записи данных — 2 в степени 0, 2 в степени 1 и так до 10-й степени. Когда степень будет равна 10 — записываем полученное значение и закрываем стрим.

const fs = require('fs');
const streamWrite = new fs.WriteStream(_filename); // _filename - имя файла

for (let i = 0; i <= 10; i++) {
  let data = `${2 ** i} | `;
  streamWrite.write(data); // записываем данные в файл
  if (i == 10) {
    streamWrite.end(); 
  } 
} 

streamWrite.on('finish', () => {
  console.log('The end');
})

Здесь мы создаем стрим для записи через fs.WriteStream, производим вычисление данных data при помощи цикла и записываем данные методом write. Когда нам больше нечего записывать, вызываем метод end. Если данные с внутреннего буфера записались все — срабатывает событие finish и выводиться сообщение в консоль.

Ничего сложного. Пойдем далее. Мы уже умеем писать стримы для записи и считывания так давайте сделаем микс — читаем данные, ищем нужные значения, если подходят — записываем.

const fs = require('fs');
const _filenameWrite = './demo/wr.txt';
const _filenameRead = './demo/demo.txt';

// создаем 2 стрима - на запись и считывание
const streamRead = new fs.ReadStream(_filenameRead);
const streamWrite = new fs.WriteStream(_filenameWrite);

streamRead.on('readable', function() {
  if (data !== null) {
    // ищем есть ли в полученом блоке нужные строки, если находим - записываем в новый файл
    if (data.includes('Понтий Пилат')) {
      streamWrite.write(data); // записываем данные в файл
    }
  }
});

streamRead.on('end', () => {
  console.log('The end read');
  streamWrite.end();
})

streamWrite.on('finish', () => {
  console.log('The end write');
})

Pipe

Pipe — это канал, который связывает поток для чтения и поток для записи. Он позволяет сразу передавать данные с чтения на запись и контролирует, чтоб к моменту передачи новых данных для записи, предыдущие были уже записаны.

const fs = require('fs');
const _filenameWrite = './demo/wr.txt';
const _filenameRead = './demo/demo.txt';

// создаем 2 стрима - на запись и считывание
const streamRead = new fs.createReadStream(_filenameRead);
const streamWrite = new fs.createWriteStream(_filenameWrite);
 
streamRead.pipe(streamWrite);

У потока чтения вызывается метод pipe(), в который передается поток для записи.

Поделиться:

Отправить ответ

Оставьте первый комментарий!

avatar
  Subscribe  
Notify of