返回首页

说说对Node中的Stream的理解?应用场景?

问题解析

Stream是Node.js处理流式数据的核心抽象。面试官通过此题考察对流式处理概念的理解,以及在实际场景中如何高效处理大数据。理解Stream对于编写高性能、低内存占用的Node.js应用至关重要。

核心概念

什么是Stream

Stream是Node.js中处理流式数据的抽象接口。数据不是一次性读入内存,而是分成小块(chunk)逐块处理。这就像水流通过管道,数据从源头流向目的地,中间可以经过多个处理环节。

┌─────────────────────────────────────────────────────────────────┐
│                      Stream 数据流动                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────┐      ┌──────────┐      ┌──────────┐      ┌──────┐ │
│  │  Source  │─────▶│Transform │─────▶│Transform │─────▶│Destination│
│  │ (数据源)  │      │ (转换)   │      │ (转换)   │      │ (目标)  │
│  └──────────┘      └──────────┘      └──────────┘      └──────┘ │
│        │                  │                  │                   │
│        ▼                  ▼                  ▼                   │
│   ┌─────────┐        ┌─────────┐        ┌─────────┐              │
│   │ chunk 1 │        │ chunk 1 │        │ chunk 1 │              │
│   │ chunk 2 │        │ chunk 2 │        │ chunk 2 │              │
│   │ chunk 3 │        │ chunk 3 │        │ chunk 3 │              │
│   └─────────┘        └─────────┘        └─────────┘              │
│                                                                  │
│  特点:                                                          │
│  1. 分块处理,内存占用低                                          │
│  2. 响应速度快,收到第一块即可处理                                   │
│  3. 可组合,通过pipe连接多个流                                      │
└─────────────────────────────────────────────────────────────────┘

Stream的四种类型

┌─────────────────────────────────────────────────────────────┐
│                      Stream 类型                             │
├─────────────────┬───────────────────────────────────────────┤
│   Readable      │  可读流(数据源)                           │
│   (可读)         │  fs.createReadStream                      │
│                 │  http.IncomingMessage                     │
│                 │  process.stdin                            │
├─────────────────┼───────────────────────────────────────────┤
│   Writable      │  可写流(数据目标)                          │
│   (可写)         │  fs.createWriteStream                     │
│                 │  http.ServerResponse                      │
│                 │  process.stdout                           │
├─────────────────┼───────────────────────────────────────────┤
│   Duplex        │  双工流(可读可写)                          │
│   (双工)         │  net.Socket                               │
│                 │  tls.TLSSocket                            │
├─────────────────┼───────────────────────────────────────────┤
│   Transform     │  转换流(可读可写+数据转换)                   │
│   (转换)         │  zlib.createGzip()                        │
│                 │  crypto.createCipher()                    │
└─────────────────┴───────────────────────────────────────────┘

Stream vs 传统方式

const fs = require('fs');

// ========== 传统方式:一次性读入内存 ==========
// 问题:大文件会占用大量内存
// 4GB内存的服务器无法处理5GB的文件
fs.readFile('large-file.txt', (err, data) => {
  if (err) throw err;
  // data 是整个文件内容,可能占用数GB内存
  const result = processData(data);
  fs.writeFile('output.txt', result, callback);
});

// ========== Stream方式:分块处理 ==========
// 优点:内存占用稳定,无论文件多大
const readStream = fs.createReadStream('large-file.txt');
const writeStream = fs.createWriteStream('output.txt');

readStream.on('data', (chunk) => {
  // 每次只处理一小块(默认64KB)
  const processed = processChunk(chunk);
  writeStream.write(processed);
});

// 或者使用pipe
fs.createReadStream('large-file.txt')
  .pipe(transformStream)
  .pipe(fs.createWriteStream('output.txt'));

详细解答

可读流(Readable)

const fs = require('fs');

// ========== 创建可读流 ==========
const readStream = fs.createReadStream('file.txt', {
  encoding: 'utf8',      // 编码,默认Buffer
  highWaterMark: 64 * 1024, // 缓冲区大小,默认64KB
  start: 0,              // 起始位置
  end: 1024              // 结束位置
});

// ========== 流动模式(Flowing Mode)==========
// 数据自动流动,通过data事件消费
readStream.on('data', (chunk) => {
  console.log('收到数据块:', chunk.length);
});

readStream.on('end', () => {
  console.log('读取完成');
});

readStream.on('error', (err) => {
  console.error('读取错误:', err);
});

// 暂停和恢复
readStream.pause();
setTimeout(() => readStream.resume(), 1000);

// ========== 暂停模式(Paused Mode)==========
// 通过read()手动读取
const pausedStream = fs.createReadStream('file.txt');
pausedStream.on('readable', () => {
  let chunk;
  while (null !== (chunk = pausedStream.read())) {
    console.log('读取到:', chunk.length);
  }
});

可写流(Writable)

const fs = require('fs');

// ========== 创建可写流 ==========
const writeStream = fs.createWriteStream('output.txt', {
  encoding: 'utf8',
  flags: 'w',           // 写入模式
  highWaterMark: 16 * 1024 // 缓冲区阈值
});

// ========== 写入数据 ==========
writeStream.write('Hello ', 'utf8');
writeStream.write('World\n', 'utf8');

// 标记结束
writeStream.end('Goodbye!\n');

// ========== 事件监听 ==========
writeStream.on('finish', () => {
  console.log('写入完成');
});

writeStream.on('error', (err) => {
  console.error('写入错误:', err);
});

// ========== 背压处理(Back Pressure)==========
// 当写入速度超过消费速度时,write返回false
const readStream = fs.createReadStream('big-input.txt');
const writeStream2 = fs.createWriteStream('big-output.txt');

readStream.on('data', (chunk) => {
  const canContinue = writeStream2.write(chunk);

  if (!canContinue) {
    // 缓冲区已满,暂停读取
    console.log('背压:暂停读取');
    readStream.pause();

    // 等待drain事件恢复
    writeStream2.once('drain', () => {
      console.log('背压解除:恢复读取');
      readStream.resume();
    });
  }
});

管道(Pipe)

const fs = require('fs');
const zlib = require('zlib');

// ========== 基本管道 ==========
// source.pipe(destination)
// 自动处理背压,自动关闭目标流
fs.createReadStream('input.txt')
  .pipe(fs.createWriteStream('output.txt'));

// ========== 管道链 ==========
// 读取 → 压缩 → 写入
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'));

// ========== pipeline(推荐)==========
const { pipeline } = require('stream');
const util = require('util');
const pipelineAsync = util.promisify(pipeline);

async function run() {
  try {
    await pipelineAsync(
      fs.createReadStream('input.txt'),
      zlib.createGzip(),
      fs.createWriteStream('output.txt.gz')
    );
    console.log('管道完成');
  } catch (err) {
    console.error('管道错误:', err);
  }
}

// ========== 自定义管道错误处理 ==========
const source = fs.createReadStream('input.txt');
const gzip = zlib.createGzip();
const dest = fs.createWriteStream('output.txt.gz');

source.pipe(gzip).pipe(dest);

// 监听错误(pipe不自动转发错误)
source.on('error', handleError);
gzip.on('error', handleError);
dest.on('error', handleError);

function handleError(err) {
  console.error('流错误:', err);
  // 清理资源
  source.destroy();
  gzip.destroy();
  dest.destroy();
}

转换流(Transform)

const { Transform } = require('stream');

// ========== 创建自定义转换流 ==========
class UpperCaseTransform extends Transform {
  constructor(options) {
    super(options);
  }

  _transform(chunk, encoding, callback) {
    // 转换逻辑
    const upperChunk = chunk.toString().toUpperCase();
    this.push(upperChunk);
    callback();
  }

  _flush(callback) {
    // 所有数据处理完毕后的清理工作
    console.log('转换完成');
    callback();
  }
}

// 使用
const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);

// ========== 行分割转换流 ==========
class LineSplitter extends Transform {
  constructor(options) {
    super(options);
    this.remainder = '';
  }

  _transform(chunk, encoding, callback) {
    const data = this.remainder + chunk.toString();
    const lines = data.split('\n');

    // 最后一行可能不完整,保存到remainder
    this.remainder = lines.pop();

    // 输出完整的行
    for (const line of lines) {
      this.push(line + '\n');
    }

    callback();
  }

  _flush(callback) {
    if (this.remainder) {
      this.push(this.remainder);
    }
    callback();
  }
}

// ========== JSON解析转换流 ==========
class JSONParser extends Transform {
  _transform(chunk, encoding, callback) {
    try {
      const obj = JSON.parse(chunk.toString());
      this.push(JSON.stringify(obj, null, 2) + '\n');
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

双工流(Duplex)

const { Duplex } = require('stream');

// ========== 创建双工流 ==========
class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.data = [];
  }

  // 可读端实现
  _read(size) {
    // 从某处获取数据并push
    if (this.data.length > 0) {
      this.push(this.data.shift());
    } else {
      this.push(null); // 结束
    }
  }

  // 可写端实现
  _write(chunk, encoding, callback) {
    // 处理写入的数据
    this.data.push(chunk);
    callback();
  }
}

// 实际应用:TCP Socket
const net = require('net');
const server = net.createServer((socket) => {
  // socket是Duplex流
  socket.on('data', (data) => {
    console.log('收到:', data.toString());
  });

  socket.write('Hello Client\n');
});

深入理解

流的内部机制

// ========== 缓冲区管理 ==========
const fs = require('fs');

const stream = fs.createReadStream('file.txt', {
  highWaterMark: 16 * 1024 // 16KB缓冲区
});

// 内部缓冲区状态
stream.on('data', (chunk) => {
  console.log({
    缓冲区长度: stream.readableLength,
    是否暂停: stream.isPaused(),
    数据块大小: chunk.length
  });
});

// ========== 对象模式 ==========
const { Readable } = require('stream');

// 默认模式传输Buffer/字符串
// 对象模式可以传输任意JavaScript对象
const objectStream = new Readable({
  objectMode: true,
  read() {
    this.push({ id: 1, name: 'Alice' });
    this.push({ id: 2, name: 'Bob' });
    this.push(null);
  }
});

objectStream.on('data', (obj) => {
  console.log('收到对象:', obj);
});

Stream的实现原理

const { Readable, Writable, Transform } = require('stream');

// ========== 简化版Readable实现原理 ==========
class SimpleReadable extends Readable {
  constructor(data, options) {
    super(options);
    this.data = data;
    this.index = 0;
  }

  _read(size) {
    // size是建议读取的字节数
    const chunk = this.data.slice(this.index, this.index + size);
    this.index += chunk.length;

    // push将数据加入缓冲区
    // push(null)表示结束
    this.push(chunk.length > 0 ? chunk : null);
  }
}

// ========== 简化版Writable实现原理 ==========
class SimpleWritable extends Writable {
  constructor(options) {
    super(options);
    this.chunks = [];
  }

  _write(chunk, encoding, callback) {
    // 实际写入操作
    this.chunks.push(chunk);

    // 模拟异步写入
    setImmediate(() => {
      callback(); // 调用callback表示完成
      // callback(err) 表示出错
    });
  }

  _final(callback) {
    // 所有数据写入后的清理
    console.log('总共写入:', Buffer.concat(this.chunks).length);
    callback();
  }
}

// ========== 简化版Transform实现原理 ==========
class SimpleTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // 1. 处理输入数据
    const processed = this.process(chunk);

    // 2. push输出数据
    this.push(processed);

    // 3. 调用callback表示完成
    callback();

    // 或者 callback(err) 表示出错
  }

  process(chunk) {
    return chunk.toString().toUpperCase();
  }
}

流的错误处理

const fs = require('fs');
const { pipeline } = require('stream');

// ========== 错误处理问题 ==========
// pipe不会自动转发错误!
const readStream = fs.createReadStream('non-existent.txt');
const writeStream = fs.createWriteStream('output.txt');

readStream.pipe(writeStream);

// 必须分别监听错误
readStream.on('error', (err) => {
  console.error('读取错误:', err);
  writeStream.destroy(); // 清理
});

writeStream.on('error', (err) => {
  console.error('写入错误:', err);
});

// ========== 使用pipeline(自动错误处理)==========
pipeline(
  fs.createReadStream('input.txt'),
  fs.createWriteStream('output.txt'),
  (err) => {
    if (err) {
      console.error('管道错误:', err);
    } else {
      console.log('管道成功');
    }
  }
);

// ========== 使用async/await ==========
const util = require('util');
const pipelineAsync = util.promisify(pipeline);

async function copyFile(source, dest) {
  try {
    await pipelineAsync(
      fs.createReadStream(source),
      fs.createWriteStream(dest)
    );
  } catch (err) {
    console.error('复制失败:', err);
    throw err;
  }
}

应用场景

1. 文件操作

const fs = require('fs');
const crypto = require('crypto');

// ========== 大文件复制 ==========
function copyLargeFile(source, dest) {
  return new Promise((resolve, reject) => {
    const readStream = fs.createReadStream(source);
    const writeStream = fs.createWriteStream(dest);

    readStream.on('error', reject);
    writeStream.on('error', reject);
    writeStream.on('finish', resolve);

    readStream.pipe(writeStream);
  });
}

// ========== 文件校验(流式计算hash)==========
function hashFile(filename) {
  return new Promise((resolve, reject) => {
    const hash = crypto.createHash('sha256');
    const stream = fs.createReadStream(filename);

    stream.on('error', reject);
    stream.on('data', (chunk) => hash.update(chunk));
    stream.on('end', () => resolve(hash.digest('hex')));
  });
}

// ========== 大文件搜索 ==========
const readline = require('readline');

async function searchInFile(filename, keyword) {
  const stream = fs.createReadStream(filename);
  const rl = readline.createInterface({
    input: stream,
    crlfDelay: Infinity
  });

  const results = [];
  let lineNumber = 0;

  for await (const line of rl) {
    lineNumber++;
    if (line.includes(keyword)) {
      results.push({ lineNumber, line });
    }
  }

  return results;
}

2. HTTP服务

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');

// ========== 流式响应 ==========
const server = http.createServer((req, res) => {
  // 直接管道文件到响应
  const readStream = fs.createReadStream('large-file.txt');

  // 支持gzip压缩
  const acceptEncoding = req.headers['accept-encoding'] || '';

  if (acceptEncoding.includes('gzip')) {
    res.writeHead(200, { 'Content-Encoding': 'gzip' });
    readStream.pipe(zlib.createGzip()).pipe(res);
  } else {
    readStream.pipe(res);
  }

  // 错误处理
  readStream.on('error', (err) => {
    res.statusCode = 500;
    res.end('Server Error');
  });
});

// ========== 流式请求处理 ==========
http.createServer((req, res) => {
  if (req.method === 'POST') {
    let size = 0;

    req.on('data', (chunk) => {
      size += chunk.length;
      console.log(`收到 ${chunk.length} 字节,总计 ${size}`);
      // 可以边接收边处理
    });

    req.on('end', () => {
      res.end(`接收完成,共 ${size} 字节`);
    });
  }
});

3. 数据转换管道

const { Transform, pipeline } = require('stream');
const fs = require('fs');

// ========== CSV转JSON流 ==========
class CSVToJSON extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.headers = null;
    this.remainder = '';
  }

  _transform(chunk, encoding, callback) {
    const data = this.remainder + chunk.toString();
    const lines = data.split('\n');
    this.remainder = lines.pop();

    for (const line of lines) {
      if (!this.headers) {
        this.headers = line.split(',');
      } else {
        const values = line.split(',');
        const obj = {};
        this.headers.forEach((h, i) => {
          obj[h.trim()] = values[i]?.trim();
        });
        this.push(obj);
      }
    }

    callback();
  }

  _flush(callback) {
    if (this.remainder) {
      const values = this.remainder.split(',');
      const obj = {};
      this.headers.forEach((h, i) => {
        obj[h.trim()] = values[i]?.trim();
      });
      this.push(obj);
    }
    callback();
  }
}

// 使用
pipeline(
  fs.createReadStream('data.csv'),
  new CSVToJSON(),
  new Transform({
    objectMode: true,
    transform(obj, enc, cb) {
      this.push(JSON.stringify(obj) + '\n');
      cb();
    }
  }),
  fs.createWriteStream('data.json'),
  (err) => {
    if (err) console.error(err);
    else console.log('转换完成');
  }
);

4. 打包工具底层

const { Transform } = require('stream');
const fs = require('fs');
const path = require('path');

// ========== 简单文件打包器 ==========
class FilePacker extends Transform {
  constructor() {
    super();
    this.files = [];
  }

  addFile(filename) {
    this.files.push(filename);
  }

  _transform(chunk, encoding, callback) {
    // 实际实现会读取文件并打包
    callback();
  }

  _flush(callback) {
    // 写入包头部信息
    // [文件数量: 4字节][文件名长度: 2字节][文件名][文件大小: 4字节][文件内容]...

    let offset = 0;
    const header = Buffer.alloc(4);
    header.writeUInt32BE(this.files.length, 0);
    this.push(header);
    offset += 4;

    for (const file of this.files) {
      const content = fs.readFileSync(file);
      const nameBuf = Buffer.from(path.basename(file));

      // 文件名长度
      const nameLenBuf = Buffer.alloc(2);
      nameLenBuf.writeUInt16BE(nameBuf.length, 0);
      this.push(nameLenBuf);

      // 文件名
      this.push(nameBuf);

      // 文件大小
      const sizeBuf = Buffer.alloc(4);
      sizeBuf.writeUInt32BE(content.length, 0);
      this.push(sizeBuf);

      // 文件内容
      this.push(content);
    }

    callback();
  }
}

最佳实践

1. 始终处理错误

const fs = require('fs');
const { pipeline } = require('stream');

// 推荐:使用pipeline
pipeline(
  fs.createReadStream('input.txt'),
  fs.createWriteStream('output.txt'),
  (err) => {
    if (err) {
      console.error('管道失败:', err);
    }
  }
);

// 不推荐:pipe不处理错误
// fs.createReadStream('input.txt').pipe(fs.createWriteStream('output.txt'));

2. 使用async/await

const util = require('util');
const stream = require('stream');
const pipeline = util.promisify(stream.pipeline);

async function processFiles() {
  try {
    await pipeline(
      fs.createReadStream('input.txt'),
      transformStream,
      fs.createWriteStream('output.txt')
    );
    console.log('处理完成');
  } catch (err) {
    console.error('处理失败:', err);
  }
}

3. 注意背压

const fs = require('fs');

// 手动处理背压
const readStream = fs.createReadStream('big.txt');
const writeStream = fs.createWriteStream('output.txt');

readStream.on('data', (chunk) => {
  if (!writeStream.write(chunk)) {
    readStream.pause();
  }
});

writeStream.on('drain', () => {
  readStream.resume();
});

// 或使用pipe自动处理
readStream.pipe(writeStream);

4. 资源清理

const fs = require('fs');

const readStream = fs.createReadStream('file.txt');
const writeStream = fs.createWriteStream('output.txt');

function cleanup() {
  readStream.destroy();
  writeStream.destroy();
}

readStream.on('error', (err) => {
  console.error('读取错误:', err);
  cleanup();
});

writeStream.on('error', (err) => {
  console.error('写入错误:', err);
  cleanup();
});

process.on('SIGINT', cleanup);

面试要点

  1. 核心概念:Stream是分块处理数据的抽象,有Readable、Writable、Duplex、Transform四种类型
  2. 管道机制:source.pipe(dest)自动处理数据流动和背压
  3. 背压处理:当消费速度慢于生产速度时,通过pause/resume或drain事件控制
  4. 错误处理:pipe不自动转发错误,应使用pipeline或分别监听error事件
  5. 应用场景:大文件处理、数据转换、HTTP流式传输、压缩加密
  6. 性能优势:内存占用低,响应延迟小,适合处理大数据

常见追问

  • Q: Stream和传统readFile有什么区别?

    • A: Stream分块处理,内存占用稳定;readFile一次性读入内存,大文件会OOM
  • Q: 什么是背压(Back Pressure)?

    • A: 当写入缓冲区满时,write返回false,应暂停读取直到drain事件
  • Q: pipe和pipeline有什么区别?

    • A: pipeline是pipe的增强版,自动处理错误和流关闭,推荐使用