说说Node中的EventEmitter?如何实现一个EventEmitter
问题解析
EventEmitter是Node.js事件驱动架构的核心。面试官通过此题考察对观察者模式的理解,以及EventEmitter的实现原理和使用场景。掌握EventEmitter对于编写可扩展、松耦合的Node.js应用至关重要。
核心概念
什么是EventEmitter
EventEmitter是Node.js中实现发布-订阅模式(观察者模式)的基类。它允许对象在状态变化时发出事件,其他对象可以订阅这些事件并作出响应。
┌─────────────────────────────────────────────────────────────┐
│ EventEmitter 模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 发布者 │ │ 订阅者 │ │
│ │ (Emitter) │ │ (Listener) │ │
│ │ │ 'data' event │ │ │
│ │ emit('data',│ ─────────────────▶ │ on('data', │ │
│ │ chunk) │ │ handler) │ │
│ │ │ │ │ │
│ └──────────────┘ └──────────────┘ │
│ │ ▲ │
│ │ │ │
│ │ ┌──────────────┐ │ │
│ │ │ 订阅者 │ │ │
│ └────────▶│ (Listener) │──────────┘ │
│ │ on('data', │ │
│ │ handler) │ │
│ └──────────────┘ │
│ │
│ 特点: │
│ 1. 松耦合:发布者和订阅者不直接依赖 │
│ 2. 一对多:一个事件可以有多个监听器 │
│ 3. 异步:事件处理通常是异步的 │
│ │
└─────────────────────────────────────────────────────────────┘
EventEmitter的核心方法
const EventEmitter = require('events');
// 常用方法
const emitter = new EventEmitter();
// on/addListener: 添加监听器
emitter.on('event', listener);
emitter.addListener('event', listener);
// once: 添加一次性监听器
emitter.once('event', listener);
// emit: 触发事件
emitter.emit('event', arg1, arg2);
// off/removeListener: 移除监听器
emitter.off('event', listener);
emitter.removeListener('event', listener);
// removeAllListeners: 移除所有监听器
emitter.removeAllListeners('event');
emitter.removeAllListeners(); // 移除所有事件的所有监听器
// listenerCount: 获取监听器数量
emitter.listenerCount('event');
// eventNames: 获取已注册的事件名列表
emitter.eventNames();
详细解答
基本使用
const EventEmitter = require('events');
// ========== 创建EventEmitter实例 ==========
const emitter = new EventEmitter();
// ========== 添加监听器 ==========
emitter.on('message', (data) => {
console.log('收到消息:', data);
});
// 同一事件可以添加多个监听器
emitter.on('message', (data) => {
console.log('再次收到:', data);
});
// ========== 触发事件 ==========
emitter.emit('message', 'Hello World');
// 输出:
// 收到消息: Hello World
// 再次收到: Hello World
// ========== 传递多个参数 ==========
emitter.on('user:login', (username, ip, time) => {
console.log(`${username} 从 ${ip} 登录于 ${time}`);
});
emitter.emit('user:login', 'Alice', '192.168.1.1', new Date());
// ========== 一次性监听器 ==========
emitter.once('init', () => {
console.log('初始化完成(只执行一次)');
});
emitter.emit('init'); // 执行
emitter.emit('init'); // 不执行
// ========== 移除监听器 ==========
function handler(data) {
console.log('处理:', data);
}
emitter.on('data', handler);
emitter.emit('data', 'test'); // 执行
emitter.off('data', handler); // Node 10+
// 或 emitter.removeListener('data', handler);
emitter.emit('data', 'test'); // 不执行
继承EventEmitter
const EventEmitter = require('events');
// ========== 类继承EventEmitter ==========
class MyEmitter extends EventEmitter {
constructor() {
super();
this.data = [];
}
addData(item) {
this.data.push(item);
this.emit('data:add', item);
}
removeData(item) {
const index = this.data.indexOf(item);
if (index !== -1) {
this.data.splice(index, 1);
this.emit('data:remove', item);
}
}
}
// 使用
const myEmitter = new MyEmitter();
myEmitter.on('data:add', (item) => {
console.log('添加:', item);
});
myEmitter.on('data:remove', (item) => {
console.log('移除:', item);
});
myEmitter.addData('item1'); // 输出: 添加: item1
myEmitter.removeData('item1'); // 输出: 移除: item1
// ========== 混入模式 ==========
function EventEmitterMixin(Base) {
return class extends Base {
constructor(...args) {
super(...args);
this._events = {};
}
on(event, listener) {
if (!this._events[event]) {
this._events[event] = [];
}
this._events[event].push(listener);
return this;
}
emit(event, ...args) {
if (this._events[event]) {
this._events[event].forEach(listener => listener(...args));
}
return this;
}
};
}
// 使用混入
class MyClass extends EventEmitterMixin(Object) {
doSomething() {
this.emit('done', 'result');
}
}
高级特性
const EventEmitter = require('events');
// ========== 监听器数量限制 ==========
const emitter = new EventEmitter();
// 默认最多10个监听器,超出会警告
emitter.setMaxListeners(20); // 设置最大监听器数量
// 获取最大监听器数量
console.log(emitter.getMaxListeners()); // 20
// ========== 监听器优先级(执行顺序)==========
// 监听器按添加顺序执行
emitter.on('event', () => console.log('第一个'));
emitter.on('event', () => console.log('第二个'));
emitter.prependListener('event', () => console.log('最前面'));
emitter.emit('event');
// 输出:
// 最前面
// 第一个
// 第二个
// ========== 获取原始监听器 ==========
function handler() {}
emitter.on('event', handler);
// 获取监听器数组(副本)
const listeners = emitter.listeners('event');
console.log(listeners); // [ [Function: handler] ]
// 获取原始监听器(未经包装)
const rawListeners = emitter.rawListeners('event');
// ========== 错误处理 ==========
// 未处理的error事件会导致程序崩溃
emitter.on('error', (err) => {
console.error('错误:', err.message);
});
emitter.emit('error', new Error('出错了'));
// ========== newListener/removeListener事件 ==========
emitter.on('newListener', (event, listener) => {
console.log(`添加监听器到 ${event}`);
});
emitter.on('removeListener', (event, listener) => {
console.log(`从 ${event} 移除监听器`);
});
深入理解
EventEmitter实现原理
// ========== 简化版EventEmitter实现 ==========
class MyEventEmitter {
constructor() {
// 存储事件和监听器的映射
this._events = Object.create(null);
this._maxListeners = 10;
}
// 添加监听器
on(event, listener) {
if (typeof listener !== 'function') {
throw new TypeError('监听器必须是函数');
}
// 初始化事件数组
if (!this._events[event]) {
this._events[event] = [];
}
// 检查监听器数量
if (this._events[event].length >= this._maxListeners) {
console.warn(`MaxListenersExceededWarning: ${event} 监听器数量超过 ${this._maxListeners}`);
}
this._events[event].push(listener);
return this;
}
// 添加一次性监听器
once(event, listener) {
// 包装函数,执行后自动移除
const onceWrapper = (...args) => {
this.off(event, onceWrapper);
listener.apply(this, args);
};
// 保存原始引用,方便移除
onceWrapper.listener = listener;
this.on(event, onceWrapper);
return this;
}
// 触发事件
emit(event, ...args) {
const listeners = this._events[event];
if (!listeners || listeners.length === 0) {
// 特殊处理error事件
if (event === 'error') {
const err = args[0];
if (err instanceof Error) {
throw err;
}
throw new Error('Uncaught, unspecified "error" event');
}
return false;
}
// 复制数组,防止在迭代过程中修改
const listenersCopy = [...listeners];
for (const listener of listenersCopy) {
try {
listener.apply(this, args);
} catch (err) {
// 继续执行其他监听器
this.emit('error', err);
}
}
return true;
}
// 移除监听器
off(event, listener) {
const listeners = this._events[event];
if (!listeners) return this;
// 查找并移除
for (let i = listeners.length - 1; i >= 0; i--) {
const l = listeners[i];
// 比较原始监听器(处理once包装的情况)
if (l === listener || l.listener === listener) {
listeners.splice(i, 1);
break;
}
}
// 清理空数组
if (listeners.length === 0) {
delete this._events[event];
}
return this;
}
// 移除所有监听器
removeAllListeners(event) {
if (event) {
delete this._events[event];
} else {
this._events = Object.create(null);
}
return this;
}
// 获取监听器数量
listenerCount(event) {
const listeners = this._events[event];
return listeners ? listeners.length : 0;
}
// 获取所有事件名
eventNames() {
return Object.keys(this._events);
}
// 设置最大监听器数量
setMaxListeners(n) {
this._maxListeners = n;
return this;
}
}
// 使用示例
const myEmitter = new MyEventEmitter();
myEmitter.on('test', (data) => console.log('收到:', data));
myEmitter.once('test', () => console.log('只执行一次'));
myEmitter.emit('test', 'Hello');
// 输出:
// 收到: Hello
// 只执行一次
myEmitter.emit('test', 'World');
// 输出:
// 收到: World
异步事件处理
const EventEmitter = require('events');
// ========== 同步 vs 异步 ==========
const emitter = new EventEmitter();
// 默认是同步执行的
emitter.on('event', () => console.log('监听器1'));
emitter.on('event', () => console.log('监听器2'));
emitter.emit('event');
console.log('emit之后');
// 输出:
// 监听器1
// 监听器2
// emit之后
// ========== 异步执行监听器 ==========
class AsyncEventEmitter extends EventEmitter {
emitAsync(event, ...args) {
const listeners = this.listeners(event);
// 使用setImmediate让监听器异步执行
listeners.forEach(listener => {
setImmediate(() => listener(...args));
});
return listeners.length > 0;
}
// 或者使用Promise
async emitPromise(event, ...args) {
const listeners = this.listeners(event);
const results = [];
for (const listener of listeners) {
try {
const result = await listener(...args);
results.push(result);
} catch (err) {
this.emit('error', err);
}
}
return results;
}
}
// ========== 使用process.nextTick ==========
emitter.on('async-event', (data) => {
process.nextTick(() => {
console.log('异步处理:', data);
});
});
emitter.emit('async-event', 'test');
console.log('emit完成');
// 输出:
// emit完成
// 异步处理: test
事件命名空间
// ========== 简单命名空间实现 ==========
class NamespacedEventEmitter extends EventEmitter {
constructor() {
super();
this._namespaces = new Map();
}
// 带命名空间的事件监听
onNS(namespace, event, listener) {
const fullEvent = `${namespace}:${event}`;
return this.on(fullEvent, listener);
}
emitNS(namespace, event, ...args) {
const fullEvent = `${namespace}:${event}`;
return this.emit(fullEvent, ...args);
}
// 移除命名空间下的所有监听器
removeAllListenersNS(namespace) {
const events = this.eventNames();
for (const event of events) {
if (event.startsWith(`${namespace}:`)) {
this.removeAllListeners(event);
}
}
return this;
}
}
// 使用
const nsEmitter = new NamespacedEventEmitter();
nsEmitter.onNS('user', 'login', (user) => {
console.log('用户登录:', user);
});
nsEmitter.onNS('admin', 'login', (admin) => {
console.log('管理员登录:', admin);
});
nsEmitter.emitNS('user', 'login', { name: 'Alice' });
// 输出: 用户登录: { name: 'Alice' }
应用场景
1. 自定义事件类
const EventEmitter = require('events');
// ========== 数据库连接池 ==========
class ConnectionPool extends EventEmitter {
constructor(maxConnections = 10) {
super();
this.maxConnections = maxConnections;
this.connections = [];
this.waiting = [];
}
async acquire() {
if (this.connections.length < this.maxConnections) {
const conn = await this.createConnection();
this.connections.push(conn);
this.emit('acquire', conn);
return conn;
}
// 等待可用连接
return new Promise((resolve) => {
this.waiting.push(resolve);
this.emit('wait');
});
}
release(conn) {
const index = this.connections.indexOf(conn);
if (index !== -1) {
this.connections.splice(index, 1);
this.emit('release', conn);
// 通知等待的客户端
if (this.waiting.length > 0) {
const resolve = this.waiting.shift();
this.acquire().then(resolve);
}
}
}
async createConnection() {
// 模拟创建连接
return { id: Date.now(), query: () => {} };
}
}
// 使用
const pool = new ConnectionPool(5);
pool.on('acquire', (conn) => {
console.log('获取连接:', conn.id);
});
pool.on('wait', () => {
console.log('等待连接...');
});
(async () => {
const conn = await pool.acquire();
// 使用连接
pool.release(conn);
})();
2. 流式数据处理
const { Readable } = require('stream');
const EventEmitter = require('events');
// ========== 数据处理器 ==========
class DataProcessor extends EventEmitter {
constructor() {
super();
this.processedCount = 0;
this.errorCount = 0;
}
async process(data) {
this.emit('start', data);
try {
// 处理数据
const result = await this.transform(data);
this.processedCount++;
this.emit('success', result, data);
return result;
} catch (err) {
this.errorCount++;
this.emit('error', err, data);
throw err;
} finally {
this.emit('end', data);
}
}
async transform(data) {
// 实际转换逻辑
return data.toUpperCase();
}
}
// 使用
const processor = new DataProcessor();
processor.on('start', (data) => console.log('开始处理:', data));
processor.on('success', (result, original) => {
console.log('处理成功:', original, '->', result);
});
processor.on('error', (err) => console.error('处理失败:', err));
processor.on('end', () => console.log('处理结束'));
processor.process('hello');
3. 插件系统
const EventEmitter = require('events');
// ========== 可扩展的应用框架 ==========
class Application extends EventEmitter {
constructor() {
super();
this.plugins = new Map();
this.hooks = new Map();
}
// 注册插件
use(plugin, options = {}) {
if (typeof plugin !== 'function') {
throw new TypeError('Plugin must be a function');
}
const name = plugin.name || `plugin_${this.plugins.size}`;
if (this.plugins.has(name)) {
console.warn(`Plugin ${name} already registered`);
return this;
}
// 调用插件,传入app实例
plugin(this, options);
this.plugins.set(name, plugin);
this.emit('plugin:registered', name);
return this;
}
// 注册钩子
hook(name, handler) {
if (!this.hooks.has(name)) {
this.hooks.set(name, []);
}
this.hooks.get(name).push(handler);
}
// 执行钩子
async executeHook(name, context) {
const handlers = this.hooks.get(name) || [];
for (const handler of handlers) {
await handler(context);
}
this.emit(`hook:${name}`, context);
}
async start() {
this.emit('before:start');
await this.executeHook('start');
this.emit('start');
}
}
// 定义插件
function loggerPlugin(app, options) {
app.on('start', () => {
console.log('Application started');
});
app.hook('start', async () => {
console.log('Logger initialized');
});
}
function databasePlugin(app, options) {
app.hook('start', async () => {
console.log('Connecting to database...');
// 连接数据库
});
}
// 使用
const app = new Application();
app.use(loggerPlugin);
app.use(databasePlugin, { url: 'mongodb://localhost' });
app.start();
4. 状态机
const EventEmitter = require('events');
// ========== 有限状态机 ==========
class StateMachine extends EventEmitter {
constructor(initialState) {
super();
this.state = initialState;
this.transitions = new Map();
}
// 定义状态转换
define(fromState, event, toState, action) {
const key = `${fromState}:${event}`;
this.transitions.set(key, { toState, action });
}
// 触发事件
async trigger(event, data) {
const key = `${this.state}:${event}`;
const transition = this.transitions.get(key);
if (!transition) {
throw new Error(`Invalid transition: ${this.state} + ${event}`);
}
const { toState, action } = transition;
const fromState = this.state;
this.emit('before:transition', { from: fromState, to: toState, event });
try {
if (action) {
await action(data);
}
this.state = toState;
this.emit('transition', { from: fromState, to: toState, event });
this.emit(`state:${toState}`, data);
return toState;
} catch (err) {
this.emit('transition:error', { from: fromState, event, error: err });
throw err;
}
}
getState() {
return this.state;
}
}
// 使用:订单状态机
const orderFsm = new StateMachine('pending');
// 定义转换
orderFsm.define('pending', 'pay', 'paid', async (data) => {
console.log('处理支付...');
// 调用支付接口
});
orderFsm.define('paid', 'ship', 'shipped', async (data) => {
console.log('发货中...');
// 调用物流接口
});
orderFsm.define('shipped', 'deliver', 'delivered');
// 监听状态变化
orderFsm.on('transition', ({ from, to }) => {
console.log(`订单状态: ${from} -> ${to}`);
});
orderFsm.on('state:paid', () => {
console.log('订单已支付,发送通知');
});
// 执行转换
(async () => {
await orderFsm.trigger('pay', { amount: 100 });
await orderFsm.trigger('ship', { tracking: '123456' });
})();
最佳实践
1. 错误处理
const EventEmitter = require('events');
class SafeEmitter extends EventEmitter {
constructor() {
super();
// 必须监听error事件,否则未处理的error会导致崩溃
this.on('error', (err) => {
console.error('EventEmitter error:', err);
});
}
safeEmit(event, ...args) {
try {
return this.emit(event, ...args);
} catch (err) {
this.emit('error', err);
return false;
}
}
}
// 使用
const emitter = new SafeEmitter();
emitter.on('data', (data) => {
if (!data) {
throw new Error('Invalid data');
}
console.log(data);
});
emitter.safeEmit('data', null); // 不会崩溃
2. 内存管理
const EventEmitter = require('events');
// ========== 避免内存泄漏 ==========
class Resource extends EventEmitter {
constructor() {
super();
this.setMaxListeners(0); // 无限制,但要小心内存泄漏
}
// 使用once代替on,如果只需要监听一次
subscribeOnce(event, handler) {
this.once(event, handler);
}
// 及时移除不需要的监听器
unsubscribe(event, handler) {
this.off(event, handler);
}
// 销毁时清理所有监听器
destroy() {
this.removeAllListeners();
}
}
// ========== WeakRef实现(高级)==========
// 使用WeakRef避免监听器阻止垃圾回收
class WeakEventEmitter extends EventEmitter {
on(event, listener) {
// 包装监听器,使用WeakRef
const weakRef = new WeakRef(listener);
const wrapper = (...args) => {
const fn = weakRef.deref();
if (fn) {
fn(...args);
} else {
// 原监听器已被回收,移除包装器
this.off(event, wrapper);
}
};
return super.on(event, wrapper);
}
}
3. 类型安全(TypeScript)
import { EventEmitter } from 'events';
// 定义事件类型
interface MyEvents {
'user:login': (user: { id: string; name: string }) => void;
'user:logout': (userId: string) => void;
'error': (error: Error) => void;
}
// 类型安全的EventEmitter
class TypedEmitter extends EventEmitter {
on<K extends keyof MyEvents>(
event: K,
listener: MyEvents[K]
): this {
return super.on(event, listener);
}
emit<K extends keyof MyEvents>(
event: K,
...args: Parameters<MyEvents[K]>
): boolean {
return super.emit(event, ...args);
}
}
// 使用
const emitter = new TypedEmitter();
emitter.on('user:login', (user) => {
console.log(user.name); // 类型安全
});
// emitter.emit('user:login', 'invalid'); // 类型错误!
emitter.emit('user:login', { id: '1', name: 'Alice' }); // 正确
4. 测试
const EventEmitter = require('events');
const assert = require('assert');
// ========== 测试EventEmitter ==========
class MyClass extends EventEmitter {
doSomething() {
this.emit('done', 'result');
}
}
// 测试
describe('MyClass', () => {
it('should emit done event', (done) => {
const obj = new MyClass();
obj.on('done', (result) => {
assert.strictEqual(result, 'result');
done();
});
obj.doSomething();
});
it('should handle multiple listeners', () => {
const obj = new MyClass();
let count = 0;
obj.on('done', () => count++);
obj.on('done', () => count++);
obj.doSomething();
assert.strictEqual(count, 2);
});
});
面试要点
- 核心概念:EventEmitter实现发布-订阅模式,是Node.js事件驱动的基础
- 核心方法:on/once(订阅)、emit(发布)、off/removeListener(取消订阅)
- 实现原理:内部使用对象存储事件和监听器数组,emit时遍历调用
- 继承方式:类继承EventEmitter或混入模式
- 注意事项:必须处理error事件、注意监听器数量限制、避免内存泄漏
- 应用场景:自定义事件类、流处理、插件系统、状态机
常见追问
-
Q: EventEmitter是同步还是异步的?
- A: 默认是同步的,监听器按顺序同步执行;可以通过process.nextTick或setImmediate实现异步
-
Q: 为什么需要setMaxListeners?
- A: 防止内存泄漏,当监听器数量超过限制时发出警告
-
Q: once是如何实现的?
- A: 包装原始监听器,在执行时先移除自身再调用原始监听器
-
Q: 未处理的error事件会发生什么?
- A: 会导致程序抛出异常并崩溃,必须始终监听error事件