Node.js Cluster 模块详解
1. 问题解析
Node.js 是单线程运行的,这意味着默认情况下只能利用一个 CPU 核心。对于多核服务器来说,这是巨大的资源浪费。Cluster 模块允许创建多个子进程(工作进程),充分利用多核 CPU 的性能,同时保持代码的简洁性。理解 Cluster 模块的工作原理对于构建高性能、高可用的 Node.js 应用至关重要。
2. 核心概念
2.1 Node.js 单线程模型
┌─────────────────────────────────────────┐
│ 单线程 Node.js 进程 │
│ ┌─────────────────────────────────┐ │
│ │ JavaScript 代码 │ │
│ │ (单线程执行) │ │
│ └─────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────┐ │
│ │ libuv 线程池 │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌────┐ │ │
│ │ │线程1│ │线程2│ │线程3│ │线程4│ │ │
│ │ └─────┘ └─────┘ └─────┘ └────┘ │ │
│ └─────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────┐ │
│ │ 操作系统(多核 CPU) │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌────┐ │ │
│ │ │核心1│ │核心2│ │核心3│ │核心4│ │ │
│ │ └─────┘ └─────┘ └─────┘ └────┘ │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
2.2 Cluster 多进程模型
┌─────────────────────────────────────────┐
│ 主进程 (Master) │
│ ┌──────────────────┐ │
│ │ 负责调度管理 │ │
│ │ - 创建/重启子进程 │ │
│ │ - 负载均衡 │ │
│ │ - 进程间通信 │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ ↓ ↓ ↓ │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Worker│ │Worker│ │Worker│ │
│ │ #1 │ │ #2 │ │ #3 │ │
│ └──┬───┘ └──┬───┘ └──┬───┘ │
└────┼───────────┼───────────┼───────────┘
│ │ │
└───────────┼───────────┘
↓
┌───────────────┐
│ 共享端口 │
│ (Round Robin) │
└───────────────┘
2.3 进程间关系
| 角色 | 职责 | 数量 |
|---|---|---|
| Master | 进程管理、负载均衡、不处理业务 | 1 |
| Worker | 处理 HTTP 请求、执行业务逻辑 | 通常等于 CPU 核心数 |
3. 详细解答
3.1 Cluster 基础使用
const cluster = require('cluster');
const http = require('http');
const os = require('os');
const numCPUs = os.cpus().length;
if (cluster.isMaster) {
console.log(`主进程 ${process.pid} 正在运行`);
// 创建与 CPU 核心数相同的工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// 监听工作进程事件
cluster.on('fork', (worker) => {
console.log(`工作进程 ${worker.process.pid} 已创建`);
});
cluster.on('online', (worker) => {
console.log(`工作进程 ${worker.process.pid} 已就绪`);
});
cluster.on('listening', (worker, address) => {
console.log(`工作进程 ${worker.process.pid} 正在监听 ${address.port}`);
});
cluster.on('disconnect', (worker) => {
console.log(`工作进程 ${worker.process.pid} 已断开连接`);
});
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 已退出 (code: ${code}, signal: ${signal})`);
// 自动重启工作进程
console.log('正在创建新的工作进程...');
cluster.fork();
});
// 主进程收到消息
Object.values(cluster.workers).forEach(worker => {
worker.on('message', (msg) => {
console.log(`主进程收到来自 ${worker.process.pid} 的消息:`, msg);
});
});
} else {
// 工作进程创建 HTTP 服务器
const server = http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from worker ${process.pid}\n`);
// 模拟工作进程向主进程发送消息
process.send({ cmd: 'notifyRequest', pid: process.pid });
});
server.listen(8000);
console.log(`工作进程 ${process.pid} 已启动`);
}
3.2 进程间通信 (IPC)
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const workers = [];
// 创建工作进程
for (let i = 0; i < os.cpus().length; i++) {
const worker = cluster.fork();
workers.push(worker);
// 接收工作进程消息
worker.on('message', (msg) => {
if (msg.cmd === 'getSharedData') {
// 主进程查询共享数据后返回
worker.send({
cmd: 'sharedDataResponse',
data: getSharedData(msg.key)
});
} else if (msg.cmd === 'broadcast') {
// 向所有工作进程广播消息
workers.forEach(w => {
if (w.id !== worker.id) {
w.send({
cmd: 'broadcastMessage',
from: worker.id,
data: msg.data
});
}
});
}
});
}
// 模拟共享数据存储
const sharedData = new Map();
function getSharedData(key) {
return sharedData.get(key);
}
} else {
// 工作进程
const http = require('http');
// 接收主进程消息
process.on('message', (msg) => {
if (msg.cmd === 'sharedDataResponse') {
console.log(`Worker ${process.pid} 收到共享数据:`, msg.data);
} else if (msg.cmd === 'broadcastMessage') {
console.log(`Worker ${process.pid} 收到广播:`, msg.data);
}
});
const server = http.createServer((req, res) => {
if (req.url === '/get-data') {
// 向主进程请求共享数据
process.send({ cmd: 'getSharedData', key: 'config' });
res.end('Request sent to master');
} else if (req.url === '/broadcast') {
// 向所有工作进程广播
process.send({ cmd: 'broadcast', data: { message: 'Hello all!' } });
res.end('Broadcast sent');
} else {
res.end(`Worker ${process.pid}`);
}
});
server.listen(8000);
}
3.3 负载均衡策略
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
// 设置负载均衡策略
// cluster.SCHED_NONE - 由操作系统决定(默认在 Windows 上)
// cluster.SCHED_RR - Round Robin 轮询(默认在非 Windows 上)
cluster.schedulingPolicy = cluster.SCHED_RR;
console.log('负载均衡策略:', cluster.schedulingPolicy === cluster.SCHED_RR ? 'Round Robin' : 'None');
for (let i = 0; i < os.cpus().length; i++) {
cluster.fork();
}
} else {
const http = require('http');
// 模拟不同处理时间的请求
http.createServer(async (req, res) => {
const delay = Math.random() * 1000;
await sleep(delay);
res.writeHead(200);
res.end(JSON.stringify({
worker: process.pid,
delay: Math.round(delay)
}));
}).listen(8000);
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
3.4 优雅关闭与零停机部署
const cluster = require('cluster');
const http = require('http');
const os = require('os');
if (cluster.isMaster) {
const workers = new Map();
// 创建工作进程
function createWorker() {
const worker = cluster.fork();
workers.set(worker.id, worker);
worker.on('exit', (code, signal) => {
workers.delete(worker.id);
// 如果不是主动关闭,则重启
if (signal !== 'SIGTERM') {
console.log(`工作进程 ${worker.process.pid} 异常退出,正在重启...`);
createWorker();
}
});
return worker;
}
// 初始创建工作进程
for (let i = 0; i < os.cpus().length; i++) {
createWorker();
}
// 优雅重启所有工作进程
function gracefulReload() {
console.log('开始优雅重启...');
const workerIds = Array.from(workers.keys());
let index = 0;
function restartNext() {
if (index >= workerIds.length) {
console.log('所有工作进程已重启');
return;
}
const workerId = workerIds[index++];
const worker = workers.get(workerId);
if (worker) {
console.log(`正在重启工作进程 ${worker.process.pid}...`);
// 发送断开连接信号
worker.disconnect();
// 超时强制终止
const timeout = setTimeout(() => {
console.log(`工作进程 ${worker.process.pid} 超时,强制终止`);
worker.kill('SIGTERM');
}, 5000);
worker.on('disconnect', () => {
clearTimeout(timeout);
console.log(`工作进程 ${worker.process.pid} 已断开连接`);
});
// 创建新工作进程
setTimeout(() => {
createWorker();
restartNext();
}, 1000);
}
}
restartNext();
}
// 监听信号进行优雅重启
process.on('SIGUSR2', gracefulReload);
// 优雅关闭
process.on('SIGTERM', () => {
console.log('主进程收到 SIGTERM,开始优雅关闭...');
workers.forEach(worker => {
worker.send({ cmd: 'shutdown' });
worker.disconnect();
});
// 超时后强制退出
setTimeout(() => {
console.log('强制退出');
process.exit(1);
}, 10000);
});
} else {
let server;
let isShuttingDown = false;
// 创建服务器
server = http.createServer((req, res) => {
if (isShuttingDown) {
res.writeHead(503, { 'Connection': 'close' });
res.end('Server is shutting down');
return;
}
// 模拟请求处理
res.writeHead(200);
res.end(`Response from worker ${process.pid}`);
});
server.listen(8000, () => {
console.log(`Worker ${process.pid} 开始监听`);
});
// 处理来自主进程的消息
process.on('message', (msg) => {
if (msg.cmd === 'shutdown') {
console.log(`Worker ${process.pid} 收到关闭信号`);
isShuttingDown = true;
// 关闭服务器,停止接受新连接
server.close(() => {
console.log(`Worker ${process.pid} 服务器已关闭`);
// 等待现有连接处理完成
setTimeout(() => {
process.exit(0);
}, 2000);
});
}
});
// 处理未捕获的异常
process.on('uncaughtException', (err) => {
console.error(`Worker ${process.pid} 未捕获异常:`, err);
// 优雅关闭
server.close(() => {
process.exit(1);
});
// 超时强制退出
setTimeout(() => {
process.exit(1);
}, 5000);
});
}
4. 深入理解
4.1 Cluster 底层原理
const cluster = require('cluster');
const net = require('net');
// Cluster 底层使用 child_process.fork() 创建子进程
// 并通过 IPC 通道进行通信
if (cluster.isMaster) {
console.log('Master 进程分析:');
console.log('- 负责创建 Worker 进程');
console.log('- 监听 Worker 事件(online, listening, disconnect, exit)');
console.log('- 处理 Worker 间负载均衡');
// 查看 cluster 模块内部实现
console.log('Worker 列表:', cluster.workers);
const worker = cluster.fork();
// 向 Worker 发送消息
worker.send({ hello: 'worker' });
// 接收 Worker 消息
worker.on('message', (msg) => {
console.log('收到 Worker 消息:', msg);
});
// 断开 Worker 连接
setTimeout(() => {
worker.disconnect();
}, 5000);
} else {
console.log(`Worker ${process.pid} 分析:`);
console.log('- 独立的 Node.js 进程');
console.log('- 有自己的 V8 实例和事件循环');
console.log('- 内存不共享');
console.log('- 通过 IPC 与 Master 通信');
// Worker 接收 Master 消息
process.on('message', (msg) => {
console.log('收到 Master 消息:', msg);
// 回复 Master
process.send({ reply: 'hello master', pid: process.pid });
});
// 创建服务器
const server = net.createServer((socket) => {
socket.end(`Handled by worker ${process.pid}\n`);
});
server.listen(8000, () => {
console.log(`Worker ${process.pid} 监听端口 8000`);
});
}
4.2 进程间内存共享方案
const cluster = require('cluster');
/**
* 由于 Worker 间内存不共享,需要使用外部存储
* 方案:Redis、共享内存、数据库
*/
// 方案 1: 使用 Redis 共享数据
const Redis = require('ioredis');
class SharedState {
constructor() {
this.redis = new Redis();
this.localCache = new Map();
}
async get(key) {
// 先查本地缓存
if (this.localCache.has(key)) {
return this.localCache.get(key);
}
// 查 Redis
const value = await this.redis.get(key);
if (value) {
this.localCache.set(key, value);
}
return value;
}
async set(key, value, ttl = 3600) {
this.localCache.set(key, value);
await this.redis.setex(key, ttl, value);
}
// 发布订阅模式实现跨进程通知
subscribe(channel, callback) {
const subscriber = new Redis();
subscriber.subscribe(channel);
subscriber.on('message', (ch, message) => {
callback(JSON.parse(message));
});
}
publish(channel, message) {
this.redis.publish(channel, JSON.stringify(message));
}
}
// 方案 2: 使用共享内存(仅限于同一台机器)
const SharedArrayBuffer = require('worker_threads').SharedArrayBuffer;
// 注意:SharedArrayBuffer 主要用于 Worker Threads,
// Cluster 进程间需要使用其他 IPC 机制
// 方案 3: 使用数据库
const sharedDB = {
async getCounter(name) {
// 使用数据库事务保证原子性
const result = await db.query(
'UPDATE counters SET value = value + 1 WHERE name = ? RETURNING value',
[name]
);
return result.rows[0].value;
}
};
4.3 与 Worker Threads 对比
/**
* Cluster vs Worker Threads
*
* Cluster:
* - 多个独立进程
* - 适合 I/O 密集型任务
* - 进程崩溃不影响其他进程
* - 内存不共享
* - 适合 Web 服务器
*
* Worker Threads:
* - 同一进程内的多个线程
* - 适合 CPU 密集型任务
* - 可以共享内存(SharedArrayBuffer)
* - 一个线程崩溃可能导致整个进程崩溃
* - 适合复杂计算
*/
// Worker Threads 示例
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
// 主线程
console.log('使用 Worker Threads 进行 CPU 密集型计算');
const worker = new Worker(__filename, {
workerData: { n: 40 } // 传递数据给 Worker
});
worker.on('message', (result) => {
console.log('计算结果:', result);
});
worker.on('error', console.error);
} else {
// Worker 线程
function fibonacci(n) {
return n < 2 ? n : fibonacci(n - 1) + fibonacci(n - 2);
}
const result = fibonacci(workerData.n);
parentPort.postMessage(result);
}
// Cluster + Worker Threads 结合使用
// Cluster 处理 HTTP 请求,Worker Threads 处理 CPU 密集型任务
const cluster = require('cluster');
const { Worker } = require('worker_threads');
if (cluster.isMaster) {
// Cluster Master 创建多个 Worker 进程
for (let i = 0; i < require('os').cpus().length; i++) {
cluster.fork();
}
} else {
// Cluster Worker 进程
const http = require('http');
http.createServer((req, res) => {
if (req.url === '/compute') {
// 使用 Worker Thread 进行计算
const worker = new Worker('./compute-worker.js', {
workerData: { n: 35 }
});
worker.on('message', (result) => {
res.end(`Result: ${result}`);
});
worker.on('error', (err) => {
res.statusCode = 500;
res.end(`Error: ${err.message}`);
});
} else {
res.end(`Hello from process ${process.pid}`);
}
}).listen(8000);
}
5. 最佳实践
5.1 生产级 Cluster 配置
const cluster = require('cluster');
const os = require('os');
const http = require('http');
const CONFIG = {
// 工作进程数,通常等于 CPU 核心数
workers: process.env.WORKERS || os.cpus().length,
// 优雅关闭超时(毫秒)
shutdownTimeout: 10000,
// 进程重启间隔(毫秒)
restartDelay: 1000,
// 最大重启次数(防止无限重启)
maxRestarts: 5
};
if (cluster.isMaster) {
console.log(`Master ${process.pid} 启动`);
const restartCount = new Map();
// 创建工作进程
for (let i = 0; i < CONFIG.workers; i++) {
createWorker();
}
function createWorker() {
const worker = cluster.fork();
restartCount.set(worker.id, 0);
worker.on('exit', (code, signal) => {
const count = restartCount.get(worker.id) || 0;
if (signal === 'SIGTERM' || code === 0) {
console.log(`Worker ${worker.process.pid} 正常退出`);
return;
}
if (count >= CONFIG.maxRestarts) {
console.error(`Worker ${worker.process.pid} 重启次数过多,放弃重启`);
return;
}
console.log(`Worker ${worker.process.pid} 退出,准备重启 (${count + 1}/${CONFIG.maxRestarts})`);
setTimeout(() => {
const newWorker = createWorker();
restartCount.set(newWorker.id, count + 1);
}, CONFIG.restartDelay);
});
return worker;
}
// 优雅重启
process.on('SIGUSR2', () => {
console.log('收到 SIGUSR2,开始优雅重启...');
const workers = Object.values(cluster.workers);
let index = 0;
function restartNext() {
if (index >= workers.length) return;
const worker = workers[index++];
worker.send({ cmd: 'graceful-shutdown' });
worker.disconnect();
const timeout = setTimeout(() => {
worker.kill('SIGTERM');
}, CONFIG.shutdownTimeout);
worker.on('disconnect', () => {
clearTimeout(timeout);
createWorker();
setTimeout(restartNext, CONFIG.restartDelay);
});
}
restartNext();
});
// 优雅关闭
process.on('SIGTERM', () => {
console.log('收到 SIGTERM,开始优雅关闭...');
Object.values(cluster.workers).forEach(worker => {
worker.send({ cmd: 'graceful-shutdown' });
worker.disconnect();
});
setTimeout(() => {
console.log('强制退出');
process.exit(1);
}, CONFIG.shutdownTimeout);
});
} else {
// Worker 进程
let server;
let isShuttingDown = false;
// 健康检查端点
const healthChecks = {
'/health': () => ({ status: 'ok', pid: process.pid }),
'/ready': () => isShuttingDown ? null : { status: 'ready' }
};
server = http.createServer((req, res) => {
// 健康检查
if (healthChecks[req.url]) {
const result = healthChecks[req.url]();
if (result) {
res.writeHead(200);
res.end(JSON.stringify(result));
} else {
res.writeHead(503);
res.end(JSON.stringify({ status: 'not ready' }));
}
return;
}
if (isShuttingDown) {
res.writeHead(503, { 'Connection': 'close' });
res.end('Server is shutting down');
return;
}
// 业务逻辑
res.writeHead(200);
res.end(`Hello from ${process.pid}`);
});
server.listen(8000, () => {
console.log(`Worker ${process.pid} 启动,监听端口 8000`);
});
// 处理关闭信号
process.on('message', (msg) => {
if (msg.cmd === 'graceful-shutdown') {
isShuttingDown = true;
server.close(() => {
console.log(`Worker ${process.pid} 服务器已关闭`);
process.exit(0);
});
setTimeout(() => {
console.log(`Worker ${process.pid} 关闭超时`);
process.exit(1);
}, CONFIG.shutdownTimeout);
}
});
// 错误处理
process.on('uncaughtException', (err) => {
console.error(`Worker ${process.pid} 未捕获异常:`, err);
process.exit(1);
});
process.on('unhandledRejection', (reason, promise) => {
console.error(`Worker ${process.pid} 未处理 Promise 拒绝:`, reason);
});
}
5.2 PM2 集群模式原理
/**
* PM2 是一个进程管理器,其集群模式基于 Node.js Cluster 模块
* 但提供了更多功能:
* - 自动负载均衡
* - 零停机重载
* - 日志管理
* - 监控
* - 进程守护
*/
// ecosystem.config.js - PM2 配置文件
module.exports = {
apps: [{
name: 'my-app',
script: './app.js',
// 集群模式
exec_mode: 'cluster',
// 实例数(0 = 使用所有 CPU)
instances: 0,
// 自动重启
autorestart: true,
// 内存限制(超过则重启)
max_memory_restart: '1G',
// 日志
log_file: './logs/combined.log',
out_file: './logs/out.log',
error_file: './logs/error.log',
// 环境变量
env: {
NODE_ENV: 'development'
},
env_production: {
NODE_ENV: 'production'
},
// 监听文件变化(开发模式)
watch: false,
// 忽略的文件
ignore_watch: ['node_modules', 'logs'],
// 优雅关闭超时
kill_timeout: 5000,
// 监听端口(PM2 会自动分配)
// 应用内使用 process.env.PORT
// 合并日志(所有实例写入同一文件)
merge_logs: true,
// 日志日期格式
log_date_format: 'YYYY-MM-DD HH:mm:ss Z'
}]
};
// PM2 常用命令
// pm2 start ecosystem.config.js
// pm2 reload my-app # 零停机重载
// pm2 scale my-app +2 # 增加 2 个实例
// pm2 scale my-app 4 # 设置实例数为 4
// pm2 monit # 监控
// pm2 logs # 查看日志
5.3 状态外化与 Session 共享
const cluster = require('cluster');
const session = require('koa-session');
const RedisStore = require('koa-redis');
/**
* 在 Cluster 模式下,Session 必须存储在外部存储中
* 不能依赖进程内存
*/
const Koa = require('koa');
const app = new Koa();
// 使用 Redis 存储 Session
app.keys = ['your-secret-key'];
app.use(session({
store: new RedisStore({
host: 'localhost',
port: 6379
}),
key: 'koa:sess',
maxAge: 86400000, // 1天
httpOnly: true,
secure: false
}, app));
// 其他需要共享的状态也使用 Redis
const redis = require('redis').createClient();
// 共享计数器
async function incrementCounter(name) {
return await redis.incr(`counter:${name}`);
}
// 共享配置
async function getConfig(key) {
return await redis.get(`config:${key}`);
}
// 分布式锁
async function acquireLock(lockName, timeout = 10000) {
const token = Date.now().toString();
const acquired = await redis.set(
`lock:${lockName}`,
token,
'PX',
timeout,
'NX'
);
return acquired ? token : null;
}
async function releaseLock(lockName, token) {
const current = await redis.get(`lock:${lockName}`);
if (current === token) {
await redis.del(`lock:${lockName}`);
}
}
6. 面试要点
-
为什么需要 Cluster 模块
- Node.js 单线程只能利用一个 CPU 核心
- 多核服务器需要创建多个进程来充分利用资源
- 提高应用吞吐量和可用性
-
Master 和 Worker 的职责分工
- Master:进程管理、负载均衡、故障恢复、不处理业务请求
- Worker:处理 HTTP 请求、执行业务逻辑
- Master 和 Worker 通过 IPC 通信
-
负载均衡策略
- Round Robin(轮询):默认策略,依次分配请求
- None:由操作系统决定(Windows 默认)
- 可以通过
cluster.schedulingPolicy设置
-
进程间通信方式
worker.send()/process.send():发送消息worker.on('message')/process.on('message'):接收消息- 由于内存不共享,需要通过 IPC 或外部存储(Redis)共享数据
-
Cluster 与 Worker Threads 的区别
- Cluster:多进程,适合 I/O 密集型,进程崩溃隔离
- Worker Threads:多线程,适合 CPU 密集型,可共享内存
- 可以结合使用:Cluster 处理 HTTP,Worker Threads 处理计算
-
PM2 集群模式的优势
- 基于 Cluster 模块,但提供更多功能
- 零停机重载、自动重启、内存限制
- 日志管理、监控、进程守护
- 简化 Cluster 的使用
-
生产环境注意事项
- Session 和状态必须外化到 Redis 等存储
- 实现优雅关闭,避免请求中断
- 设置进程重启限制,防止无限重启
- 实现健康检查端点
- 处理未捕获异常和 Promise 拒绝