说说对Node中的Stream的理解?应用场景?
问题解析
Stream是Node.js处理流式数据的核心抽象。面试官通过此题考察对流式处理概念的理解,以及在实际场景中如何高效处理大数据。理解Stream对于编写高性能、低内存占用的Node.js应用至关重要。
核心概念
什么是Stream
Stream是Node.js中处理流式数据的抽象接口。数据不是一次性读入内存,而是分成小块(chunk)逐块处理。这就像水流通过管道,数据从源头流向目的地,中间可以经过多个处理环节。
┌─────────────────────────────────────────────────────────────────┐
│ Stream 数据流动 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────┐ │
│ │ Source │─────▶│Transform │─────▶│Transform │─────▶│Destination│
│ │ (数据源) │ │ (转换) │ │ (转换) │ │ (目标) │
│ └──────────┘ └──────────┘ └──────────┘ └──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ chunk 1 │ │ chunk 1 │ │ chunk 1 │ │
│ │ chunk 2 │ │ chunk 2 │ │ chunk 2 │ │
│ │ chunk 3 │ │ chunk 3 │ │ chunk 3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 特点: │
│ 1. 分块处理,内存占用低 │
│ 2. 响应速度快,收到第一块即可处理 │
│ 3. 可组合,通过pipe连接多个流 │
└─────────────────────────────────────────────────────────────────┘
Stream的四种类型
┌─────────────────────────────────────────────────────────────┐
│ Stream 类型 │
├─────────────────┬───────────────────────────────────────────┤
│ Readable │ 可读流(数据源) │
│ (可读) │ fs.createReadStream │
│ │ http.IncomingMessage │
│ │ process.stdin │
├─────────────────┼───────────────────────────────────────────┤
│ Writable │ 可写流(数据目标) │
│ (可写) │ fs.createWriteStream │
│ │ http.ServerResponse │
│ │ process.stdout │
├─────────────────┼───────────────────────────────────────────┤
│ Duplex │ 双工流(可读可写) │
│ (双工) │ net.Socket │
│ │ tls.TLSSocket │
├─────────────────┼───────────────────────────────────────────┤
│ Transform │ 转换流(可读可写+数据转换) │
│ (转换) │ zlib.createGzip() │
│ │ crypto.createCipher() │
└─────────────────┴───────────────────────────────────────────┘
Stream vs 传统方式
const fs = require('fs');
// ========== 传统方式:一次性读入内存 ==========
// 问题:大文件会占用大量内存
// 4GB内存的服务器无法处理5GB的文件
fs.readFile('large-file.txt', (err, data) => {
if (err) throw err;
// data 是整个文件内容,可能占用数GB内存
const result = processData(data);
fs.writeFile('output.txt', result, callback);
});
// ========== Stream方式:分块处理 ==========
// 优点:内存占用稳定,无论文件多大
const readStream = fs.createReadStream('large-file.txt');
const writeStream = fs.createWriteStream('output.txt');
readStream.on('data', (chunk) => {
// 每次只处理一小块(默认64KB)
const processed = processChunk(chunk);
writeStream.write(processed);
});
// 或者使用pipe
fs.createReadStream('large-file.txt')
.pipe(transformStream)
.pipe(fs.createWriteStream('output.txt'));
详细解答
可读流(Readable)
const fs = require('fs');
// ========== 创建可读流 ==========
const readStream = fs.createReadStream('file.txt', {
encoding: 'utf8', // 编码,默认Buffer
highWaterMark: 64 * 1024, // 缓冲区大小,默认64KB
start: 0, // 起始位置
end: 1024 // 结束位置
});
// ========== 流动模式(Flowing Mode)==========
// 数据自动流动,通过data事件消费
readStream.on('data', (chunk) => {
console.log('收到数据块:', chunk.length);
});
readStream.on('end', () => {
console.log('读取完成');
});
readStream.on('error', (err) => {
console.error('读取错误:', err);
});
// 暂停和恢复
readStream.pause();
setTimeout(() => readStream.resume(), 1000);
// ========== 暂停模式(Paused Mode)==========
// 通过read()手动读取
const pausedStream = fs.createReadStream('file.txt');
pausedStream.on('readable', () => {
let chunk;
while (null !== (chunk = pausedStream.read())) {
console.log('读取到:', chunk.length);
}
});
可写流(Writable)
const fs = require('fs');
// ========== 创建可写流 ==========
const writeStream = fs.createWriteStream('output.txt', {
encoding: 'utf8',
flags: 'w', // 写入模式
highWaterMark: 16 * 1024 // 缓冲区阈值
});
// ========== 写入数据 ==========
writeStream.write('Hello ', 'utf8');
writeStream.write('World\n', 'utf8');
// 标记结束
writeStream.end('Goodbye!\n');
// ========== 事件监听 ==========
writeStream.on('finish', () => {
console.log('写入完成');
});
writeStream.on('error', (err) => {
console.error('写入错误:', err);
});
// ========== 背压处理(Back Pressure)==========
// 当写入速度超过消费速度时,write返回false
const readStream = fs.createReadStream('big-input.txt');
const writeStream2 = fs.createWriteStream('big-output.txt');
readStream.on('data', (chunk) => {
const canContinue = writeStream2.write(chunk);
if (!canContinue) {
// 缓冲区已满,暂停读取
console.log('背压:暂停读取');
readStream.pause();
// 等待drain事件恢复
writeStream2.once('drain', () => {
console.log('背压解除:恢复读取');
readStream.resume();
});
}
});
管道(Pipe)
const fs = require('fs');
const zlib = require('zlib');
// ========== 基本管道 ==========
// source.pipe(destination)
// 自动处理背压,自动关闭目标流
fs.createReadStream('input.txt')
.pipe(fs.createWriteStream('output.txt'));
// ========== 管道链 ==========
// 读取 → 压缩 → 写入
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
// ========== pipeline(推荐)==========
const { pipeline } = require('stream');
const util = require('util');
const pipelineAsync = util.promisify(pipeline);
async function run() {
try {
await pipelineAsync(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz')
);
console.log('管道完成');
} catch (err) {
console.error('管道错误:', err);
}
}
// ========== 自定义管道错误处理 ==========
const source = fs.createReadStream('input.txt');
const gzip = zlib.createGzip();
const dest = fs.createWriteStream('output.txt.gz');
source.pipe(gzip).pipe(dest);
// 监听错误(pipe不自动转发错误)
source.on('error', handleError);
gzip.on('error', handleError);
dest.on('error', handleError);
function handleError(err) {
console.error('流错误:', err);
// 清理资源
source.destroy();
gzip.destroy();
dest.destroy();
}
转换流(Transform)
const { Transform } = require('stream');
// ========== 创建自定义转换流 ==========
class UpperCaseTransform extends Transform {
constructor(options) {
super(options);
}
_transform(chunk, encoding, callback) {
// 转换逻辑
const upperChunk = chunk.toString().toUpperCase();
this.push(upperChunk);
callback();
}
_flush(callback) {
// 所有数据处理完毕后的清理工作
console.log('转换完成');
callback();
}
}
// 使用
const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);
// ========== 行分割转换流 ==========
class LineSplitter extends Transform {
constructor(options) {
super(options);
this.remainder = '';
}
_transform(chunk, encoding, callback) {
const data = this.remainder + chunk.toString();
const lines = data.split('\n');
// 最后一行可能不完整,保存到remainder
this.remainder = lines.pop();
// 输出完整的行
for (const line of lines) {
this.push(line + '\n');
}
callback();
}
_flush(callback) {
if (this.remainder) {
this.push(this.remainder);
}
callback();
}
}
// ========== JSON解析转换流 ==========
class JSONParser extends Transform {
_transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk.toString());
this.push(JSON.stringify(obj, null, 2) + '\n');
callback();
} catch (err) {
callback(err);
}
}
}
双工流(Duplex)
const { Duplex } = require('stream');
// ========== 创建双工流 ==========
class MyDuplex extends Duplex {
constructor(options) {
super(options);
this.data = [];
}
// 可读端实现
_read(size) {
// 从某处获取数据并push
if (this.data.length > 0) {
this.push(this.data.shift());
} else {
this.push(null); // 结束
}
}
// 可写端实现
_write(chunk, encoding, callback) {
// 处理写入的数据
this.data.push(chunk);
callback();
}
}
// 实际应用:TCP Socket
const net = require('net');
const server = net.createServer((socket) => {
// socket是Duplex流
socket.on('data', (data) => {
console.log('收到:', data.toString());
});
socket.write('Hello Client\n');
});
深入理解
流的内部机制
// ========== 缓冲区管理 ==========
const fs = require('fs');
const stream = fs.createReadStream('file.txt', {
highWaterMark: 16 * 1024 // 16KB缓冲区
});
// 内部缓冲区状态
stream.on('data', (chunk) => {
console.log({
缓冲区长度: stream.readableLength,
是否暂停: stream.isPaused(),
数据块大小: chunk.length
});
});
// ========== 对象模式 ==========
const { Readable } = require('stream');
// 默认模式传输Buffer/字符串
// 对象模式可以传输任意JavaScript对象
const objectStream = new Readable({
objectMode: true,
read() {
this.push({ id: 1, name: 'Alice' });
this.push({ id: 2, name: 'Bob' });
this.push(null);
}
});
objectStream.on('data', (obj) => {
console.log('收到对象:', obj);
});
Stream的实现原理
const { Readable, Writable, Transform } = require('stream');
// ========== 简化版Readable实现原理 ==========
class SimpleReadable extends Readable {
constructor(data, options) {
super(options);
this.data = data;
this.index = 0;
}
_read(size) {
// size是建议读取的字节数
const chunk = this.data.slice(this.index, this.index + size);
this.index += chunk.length;
// push将数据加入缓冲区
// push(null)表示结束
this.push(chunk.length > 0 ? chunk : null);
}
}
// ========== 简化版Writable实现原理 ==========
class SimpleWritable extends Writable {
constructor(options) {
super(options);
this.chunks = [];
}
_write(chunk, encoding, callback) {
// 实际写入操作
this.chunks.push(chunk);
// 模拟异步写入
setImmediate(() => {
callback(); // 调用callback表示完成
// callback(err) 表示出错
});
}
_final(callback) {
// 所有数据写入后的清理
console.log('总共写入:', Buffer.concat(this.chunks).length);
callback();
}
}
// ========== 简化版Transform实现原理 ==========
class SimpleTransform extends Transform {
_transform(chunk, encoding, callback) {
// 1. 处理输入数据
const processed = this.process(chunk);
// 2. push输出数据
this.push(processed);
// 3. 调用callback表示完成
callback();
// 或者 callback(err) 表示出错
}
process(chunk) {
return chunk.toString().toUpperCase();
}
}
流的错误处理
const fs = require('fs');
const { pipeline } = require('stream');
// ========== 错误处理问题 ==========
// pipe不会自动转发错误!
const readStream = fs.createReadStream('non-existent.txt');
const writeStream = fs.createWriteStream('output.txt');
readStream.pipe(writeStream);
// 必须分别监听错误
readStream.on('error', (err) => {
console.error('读取错误:', err);
writeStream.destroy(); // 清理
});
writeStream.on('error', (err) => {
console.error('写入错误:', err);
});
// ========== 使用pipeline(自动错误处理)==========
pipeline(
fs.createReadStream('input.txt'),
fs.createWriteStream('output.txt'),
(err) => {
if (err) {
console.error('管道错误:', err);
} else {
console.log('管道成功');
}
}
);
// ========== 使用async/await ==========
const util = require('util');
const pipelineAsync = util.promisify(pipeline);
async function copyFile(source, dest) {
try {
await pipelineAsync(
fs.createReadStream(source),
fs.createWriteStream(dest)
);
} catch (err) {
console.error('复制失败:', err);
throw err;
}
}
应用场景
1. 文件操作
const fs = require('fs');
const crypto = require('crypto');
// ========== 大文件复制 ==========
function copyLargeFile(source, dest) {
return new Promise((resolve, reject) => {
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(dest);
readStream.on('error', reject);
writeStream.on('error', reject);
writeStream.on('finish', resolve);
readStream.pipe(writeStream);
});
}
// ========== 文件校验(流式计算hash)==========
function hashFile(filename) {
return new Promise((resolve, reject) => {
const hash = crypto.createHash('sha256');
const stream = fs.createReadStream(filename);
stream.on('error', reject);
stream.on('data', (chunk) => hash.update(chunk));
stream.on('end', () => resolve(hash.digest('hex')));
});
}
// ========== 大文件搜索 ==========
const readline = require('readline');
async function searchInFile(filename, keyword) {
const stream = fs.createReadStream(filename);
const rl = readline.createInterface({
input: stream,
crlfDelay: Infinity
});
const results = [];
let lineNumber = 0;
for await (const line of rl) {
lineNumber++;
if (line.includes(keyword)) {
results.push({ lineNumber, line });
}
}
return results;
}
2. HTTP服务
const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
// ========== 流式响应 ==========
const server = http.createServer((req, res) => {
// 直接管道文件到响应
const readStream = fs.createReadStream('large-file.txt');
// 支持gzip压缩
const acceptEncoding = req.headers['accept-encoding'] || '';
if (acceptEncoding.includes('gzip')) {
res.writeHead(200, { 'Content-Encoding': 'gzip' });
readStream.pipe(zlib.createGzip()).pipe(res);
} else {
readStream.pipe(res);
}
// 错误处理
readStream.on('error', (err) => {
res.statusCode = 500;
res.end('Server Error');
});
});
// ========== 流式请求处理 ==========
http.createServer((req, res) => {
if (req.method === 'POST') {
let size = 0;
req.on('data', (chunk) => {
size += chunk.length;
console.log(`收到 ${chunk.length} 字节,总计 ${size}`);
// 可以边接收边处理
});
req.on('end', () => {
res.end(`接收完成,共 ${size} 字节`);
});
}
});
3. 数据转换管道
const { Transform, pipeline } = require('stream');
const fs = require('fs');
// ========== CSV转JSON流 ==========
class CSVToJSON extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.headers = null;
this.remainder = '';
}
_transform(chunk, encoding, callback) {
const data = this.remainder + chunk.toString();
const lines = data.split('\n');
this.remainder = lines.pop();
for (const line of lines) {
if (!this.headers) {
this.headers = line.split(',');
} else {
const values = line.split(',');
const obj = {};
this.headers.forEach((h, i) => {
obj[h.trim()] = values[i]?.trim();
});
this.push(obj);
}
}
callback();
}
_flush(callback) {
if (this.remainder) {
const values = this.remainder.split(',');
const obj = {};
this.headers.forEach((h, i) => {
obj[h.trim()] = values[i]?.trim();
});
this.push(obj);
}
callback();
}
}
// 使用
pipeline(
fs.createReadStream('data.csv'),
new CSVToJSON(),
new Transform({
objectMode: true,
transform(obj, enc, cb) {
this.push(JSON.stringify(obj) + '\n');
cb();
}
}),
fs.createWriteStream('data.json'),
(err) => {
if (err) console.error(err);
else console.log('转换完成');
}
);
4. 打包工具底层
const { Transform } = require('stream');
const fs = require('fs');
const path = require('path');
// ========== 简单文件打包器 ==========
class FilePacker extends Transform {
constructor() {
super();
this.files = [];
}
addFile(filename) {
this.files.push(filename);
}
_transform(chunk, encoding, callback) {
// 实际实现会读取文件并打包
callback();
}
_flush(callback) {
// 写入包头部信息
// [文件数量: 4字节][文件名长度: 2字节][文件名][文件大小: 4字节][文件内容]...
let offset = 0;
const header = Buffer.alloc(4);
header.writeUInt32BE(this.files.length, 0);
this.push(header);
offset += 4;
for (const file of this.files) {
const content = fs.readFileSync(file);
const nameBuf = Buffer.from(path.basename(file));
// 文件名长度
const nameLenBuf = Buffer.alloc(2);
nameLenBuf.writeUInt16BE(nameBuf.length, 0);
this.push(nameLenBuf);
// 文件名
this.push(nameBuf);
// 文件大小
const sizeBuf = Buffer.alloc(4);
sizeBuf.writeUInt32BE(content.length, 0);
this.push(sizeBuf);
// 文件内容
this.push(content);
}
callback();
}
}
最佳实践
1. 始终处理错误
const fs = require('fs');
const { pipeline } = require('stream');
// 推荐:使用pipeline
pipeline(
fs.createReadStream('input.txt'),
fs.createWriteStream('output.txt'),
(err) => {
if (err) {
console.error('管道失败:', err);
}
}
);
// 不推荐:pipe不处理错误
// fs.createReadStream('input.txt').pipe(fs.createWriteStream('output.txt'));
2. 使用async/await
const util = require('util');
const stream = require('stream');
const pipeline = util.promisify(stream.pipeline);
async function processFiles() {
try {
await pipeline(
fs.createReadStream('input.txt'),
transformStream,
fs.createWriteStream('output.txt')
);
console.log('处理完成');
} catch (err) {
console.error('处理失败:', err);
}
}
3. 注意背压
const fs = require('fs');
// 手动处理背压
const readStream = fs.createReadStream('big.txt');
const writeStream = fs.createWriteStream('output.txt');
readStream.on('data', (chunk) => {
if (!writeStream.write(chunk)) {
readStream.pause();
}
});
writeStream.on('drain', () => {
readStream.resume();
});
// 或使用pipe自动处理
readStream.pipe(writeStream);
4. 资源清理
const fs = require('fs');
const readStream = fs.createReadStream('file.txt');
const writeStream = fs.createWriteStream('output.txt');
function cleanup() {
readStream.destroy();
writeStream.destroy();
}
readStream.on('error', (err) => {
console.error('读取错误:', err);
cleanup();
});
writeStream.on('error', (err) => {
console.error('写入错误:', err);
cleanup();
});
process.on('SIGINT', cleanup);
面试要点
- 核心概念:Stream是分块处理数据的抽象,有Readable、Writable、Duplex、Transform四种类型
- 管道机制:source.pipe(dest)自动处理数据流动和背压
- 背压处理:当消费速度慢于生产速度时,通过pause/resume或drain事件控制
- 错误处理:pipe不自动转发错误,应使用pipeline或分别监听error事件
- 应用场景:大文件处理、数据转换、HTTP流式传输、压缩加密
- 性能优势:内存占用低,响应延迟小,适合处理大数据
常见追问
-
Q: Stream和传统readFile有什么区别?
- A: Stream分块处理,内存占用稳定;readFile一次性读入内存,大文件会OOM
-
Q: 什么是背压(Back Pressure)?
- A: 当写入缓冲区满时,write返回false,应暂停读取直到drain事件
-
Q: pipe和pipeline有什么区别?
- A: pipeline是pipe的增强版,自动处理错误和流关闭,推荐使用