返回首页

说说Node中的EventEmitter?如何实现一个EventEmitter

问题解析

EventEmitter是Node.js事件驱动架构的核心。面试官通过此题考察对观察者模式的理解,以及EventEmitter的实现原理和使用场景。掌握EventEmitter对于编写可扩展、松耦合的Node.js应用至关重要。

核心概念

什么是EventEmitter

EventEmitter是Node.js中实现发布-订阅模式(观察者模式)的基类。它允许对象在状态变化时发出事件,其他对象可以订阅这些事件并作出响应。

┌─────────────────────────────────────────────────────────────┐
│                    EventEmitter 模式                         │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   ┌──────────────┐                    ┌──────────────┐      │
│   │   发布者      │                    │   订阅者      │      │
│   │  (Emitter)   │                    │  (Listener)  │      │
│   │              │    'data' event    │              │      │
│   │  emit('data',│ ─────────────────▶ │  on('data',  │      │
│   │    chunk)    │                    │   handler)   │      │
│   │              │                    │              │      │
│   └──────────────┘                    └──────────────┘      │
│          │                                   ▲               │
│          │                                   │               │
│          │         ┌──────────────┐          │               │
│          │         │   订阅者      │          │               │
│          └────────▶│  (Listener)  │──────────┘               │
│                    │  on('data',  │                          │
│                    │   handler)   │                          │
│                    └──────────────┘                          │
│                                                              │
│   特点:                                                      │
│   1. 松耦合:发布者和订阅者不直接依赖                          │
│   2. 一对多:一个事件可以有多个监听器                          │
│   3. 异步:事件处理通常是异步的                                │
│                                                              │
└─────────────────────────────────────────────────────────────┘

EventEmitter的核心方法

const EventEmitter = require('events');

// 常用方法
const emitter = new EventEmitter();

// on/addListener: 添加监听器
emitter.on('event', listener);
emitter.addListener('event', listener);

// once: 添加一次性监听器
emitter.once('event', listener);

// emit: 触发事件
emitter.emit('event', arg1, arg2);

// off/removeListener: 移除监听器
emitter.off('event', listener);
emitter.removeListener('event', listener);

// removeAllListeners: 移除所有监听器
emitter.removeAllListeners('event');
emitter.removeAllListeners(); // 移除所有事件的所有监听器

// listenerCount: 获取监听器数量
emitter.listenerCount('event');

// eventNames: 获取已注册的事件名列表
emitter.eventNames();

详细解答

基本使用

const EventEmitter = require('events');

// ========== 创建EventEmitter实例 ==========
const emitter = new EventEmitter();

// ========== 添加监听器 ==========
emitter.on('message', (data) => {
  console.log('收到消息:', data);
});

// 同一事件可以添加多个监听器
emitter.on('message', (data) => {
  console.log('再次收到:', data);
});

// ========== 触发事件 ==========
emitter.emit('message', 'Hello World');
// 输出:
// 收到消息: Hello World
// 再次收到: Hello World

// ========== 传递多个参数 ==========
emitter.on('user:login', (username, ip, time) => {
  console.log(`${username}${ip} 登录于 ${time}`);
});

emitter.emit('user:login', 'Alice', '192.168.1.1', new Date());

// ========== 一次性监听器 ==========
emitter.once('init', () => {
  console.log('初始化完成(只执行一次)');
});

emitter.emit('init'); // 执行
emitter.emit('init'); // 不执行

// ========== 移除监听器 ==========
function handler(data) {
  console.log('处理:', data);
}

emitter.on('data', handler);
emitter.emit('data', 'test'); // 执行

emitter.off('data', handler); // Node 10+
// 或 emitter.removeListener('data', handler);
emitter.emit('data', 'test'); // 不执行

继承EventEmitter

const EventEmitter = require('events');

// ========== 类继承EventEmitter ==========
class MyEmitter extends EventEmitter {
  constructor() {
    super();
    this.data = [];
  }

  addData(item) {
    this.data.push(item);
    this.emit('data:add', item);
  }

  removeData(item) {
    const index = this.data.indexOf(item);
    if (index !== -1) {
      this.data.splice(index, 1);
      this.emit('data:remove', item);
    }
  }
}

// 使用
const myEmitter = new MyEmitter();

myEmitter.on('data:add', (item) => {
  console.log('添加:', item);
});

myEmitter.on('data:remove', (item) => {
  console.log('移除:', item);
});

myEmitter.addData('item1');    // 输出: 添加: item1
myEmitter.removeData('item1'); // 输出: 移除: item1

// ========== 混入模式 ==========
function EventEmitterMixin(Base) {
  return class extends Base {
    constructor(...args) {
      super(...args);
      this._events = {};
    }

    on(event, listener) {
      if (!this._events[event]) {
        this._events[event] = [];
      }
      this._events[event].push(listener);
      return this;
    }

    emit(event, ...args) {
      if (this._events[event]) {
        this._events[event].forEach(listener => listener(...args));
      }
      return this;
    }
  };
}

// 使用混入
class MyClass extends EventEmitterMixin(Object) {
  doSomething() {
    this.emit('done', 'result');
  }
}

高级特性

const EventEmitter = require('events');

// ========== 监听器数量限制 ==========
const emitter = new EventEmitter();

// 默认最多10个监听器,超出会警告
emitter.setMaxListeners(20); // 设置最大监听器数量

// 获取最大监听器数量
console.log(emitter.getMaxListeners()); // 20

// ========== 监听器优先级(执行顺序)==========
// 监听器按添加顺序执行
emitter.on('event', () => console.log('第一个'));
emitter.on('event', () => console.log('第二个'));
emitter.prependListener('event', () => console.log('最前面'));

emitter.emit('event');
// 输出:
// 最前面
// 第一个
// 第二个

// ========== 获取原始监听器 ==========
function handler() {}
emitter.on('event', handler);

// 获取监听器数组(副本)
const listeners = emitter.listeners('event');
console.log(listeners); // [ [Function: handler] ]

// 获取原始监听器(未经包装)
const rawListeners = emitter.rawListeners('event');

// ========== 错误处理 ==========
// 未处理的error事件会导致程序崩溃
emitter.on('error', (err) => {
  console.error('错误:', err.message);
});

emitter.emit('error', new Error('出错了'));

// ========== newListener/removeListener事件 ==========
emitter.on('newListener', (event, listener) => {
  console.log(`添加监听器到 ${event}`);
});

emitter.on('removeListener', (event, listener) => {
  console.log(`从 ${event} 移除监听器`);
});

深入理解

EventEmitter实现原理

// ========== 简化版EventEmitter实现 ==========
class MyEventEmitter {
  constructor() {
    // 存储事件和监听器的映射
    this._events = Object.create(null);
    this._maxListeners = 10;
  }

  // 添加监听器
  on(event, listener) {
    if (typeof listener !== 'function') {
      throw new TypeError('监听器必须是函数');
    }

    // 初始化事件数组
    if (!this._events[event]) {
      this._events[event] = [];
    }

    // 检查监听器数量
    if (this._events[event].length >= this._maxListeners) {
      console.warn(`MaxListenersExceededWarning: ${event} 监听器数量超过 ${this._maxListeners}`);
    }

    this._events[event].push(listener);
    return this;
  }

  // 添加一次性监听器
  once(event, listener) {
    // 包装函数,执行后自动移除
    const onceWrapper = (...args) => {
      this.off(event, onceWrapper);
      listener.apply(this, args);
    };

    // 保存原始引用,方便移除
    onceWrapper.listener = listener;
    this.on(event, onceWrapper);
    return this;
  }

  // 触发事件
  emit(event, ...args) {
    const listeners = this._events[event];

    if (!listeners || listeners.length === 0) {
      // 特殊处理error事件
      if (event === 'error') {
        const err = args[0];
        if (err instanceof Error) {
          throw err;
        }
        throw new Error('Uncaught, unspecified "error" event');
      }
      return false;
    }

    // 复制数组,防止在迭代过程中修改
    const listenersCopy = [...listeners];

    for (const listener of listenersCopy) {
      try {
        listener.apply(this, args);
      } catch (err) {
        // 继续执行其他监听器
        this.emit('error', err);
      }
    }

    return true;
  }

  // 移除监听器
  off(event, listener) {
    const listeners = this._events[event];

    if (!listeners) return this;

    // 查找并移除
    for (let i = listeners.length - 1; i >= 0; i--) {
      const l = listeners[i];
      // 比较原始监听器(处理once包装的情况)
      if (l === listener || l.listener === listener) {
        listeners.splice(i, 1);
        break;
      }
    }

    // 清理空数组
    if (listeners.length === 0) {
      delete this._events[event];
    }

    return this;
  }

  // 移除所有监听器
  removeAllListeners(event) {
    if (event) {
      delete this._events[event];
    } else {
      this._events = Object.create(null);
    }
    return this;
  }

  // 获取监听器数量
  listenerCount(event) {
    const listeners = this._events[event];
    return listeners ? listeners.length : 0;
  }

  // 获取所有事件名
  eventNames() {
    return Object.keys(this._events);
  }

  // 设置最大监听器数量
  setMaxListeners(n) {
    this._maxListeners = n;
    return this;
  }
}

// 使用示例
const myEmitter = new MyEventEmitter();

myEmitter.on('test', (data) => console.log('收到:', data));
myEmitter.once('test', () => console.log('只执行一次'));

myEmitter.emit('test', 'Hello');
// 输出:
// 收到: Hello
// 只执行一次

myEmitter.emit('test', 'World');
// 输出:
// 收到: World

异步事件处理

const EventEmitter = require('events');

// ========== 同步 vs 异步 ==========
const emitter = new EventEmitter();

// 默认是同步执行的
emitter.on('event', () => console.log('监听器1'));
emitter.on('event', () => console.log('监听器2'));
emitter.emit('event');
console.log('emit之后');
// 输出:
// 监听器1
// 监听器2
// emit之后

// ========== 异步执行监听器 ==========
class AsyncEventEmitter extends EventEmitter {
  emitAsync(event, ...args) {
    const listeners = this.listeners(event);

    // 使用setImmediate让监听器异步执行
    listeners.forEach(listener => {
      setImmediate(() => listener(...args));
    });

    return listeners.length > 0;
  }

  // 或者使用Promise
  async emitPromise(event, ...args) {
    const listeners = this.listeners(event);
    const results = [];

    for (const listener of listeners) {
      try {
        const result = await listener(...args);
        results.push(result);
      } catch (err) {
        this.emit('error', err);
      }
    }

    return results;
  }
}

// ========== 使用process.nextTick ==========
emitter.on('async-event', (data) => {
  process.nextTick(() => {
    console.log('异步处理:', data);
  });
});

emitter.emit('async-event', 'test');
console.log('emit完成');
// 输出:
// emit完成
// 异步处理: test

事件命名空间

// ========== 简单命名空间实现 ==========
class NamespacedEventEmitter extends EventEmitter {
  constructor() {
    super();
    this._namespaces = new Map();
  }

  // 带命名空间的事件监听
  onNS(namespace, event, listener) {
    const fullEvent = `${namespace}:${event}`;
    return this.on(fullEvent, listener);
  }

  emitNS(namespace, event, ...args) {
    const fullEvent = `${namespace}:${event}`;
    return this.emit(fullEvent, ...args);
  }

  // 移除命名空间下的所有监听器
  removeAllListenersNS(namespace) {
    const events = this.eventNames();
    for (const event of events) {
      if (event.startsWith(`${namespace}:`)) {
        this.removeAllListeners(event);
      }
    }
    return this;
  }
}

// 使用
const nsEmitter = new NamespacedEventEmitter();

nsEmitter.onNS('user', 'login', (user) => {
  console.log('用户登录:', user);
});

nsEmitter.onNS('admin', 'login', (admin) => {
  console.log('管理员登录:', admin);
});

nsEmitter.emitNS('user', 'login', { name: 'Alice' });
// 输出: 用户登录: { name: 'Alice' }

应用场景

1. 自定义事件类

const EventEmitter = require('events');

// ========== 数据库连接池 ==========
class ConnectionPool extends EventEmitter {
  constructor(maxConnections = 10) {
    super();
    this.maxConnections = maxConnections;
    this.connections = [];
    this.waiting = [];
  }

  async acquire() {
    if (this.connections.length < this.maxConnections) {
      const conn = await this.createConnection();
      this.connections.push(conn);
      this.emit('acquire', conn);
      return conn;
    }

    // 等待可用连接
    return new Promise((resolve) => {
      this.waiting.push(resolve);
      this.emit('wait');
    });
  }

  release(conn) {
    const index = this.connections.indexOf(conn);
    if (index !== -1) {
      this.connections.splice(index, 1);
      this.emit('release', conn);

      // 通知等待的客户端
      if (this.waiting.length > 0) {
        const resolve = this.waiting.shift();
        this.acquire().then(resolve);
      }
    }
  }

  async createConnection() {
    // 模拟创建连接
    return { id: Date.now(), query: () => {} };
  }
}

// 使用
const pool = new ConnectionPool(5);

pool.on('acquire', (conn) => {
  console.log('获取连接:', conn.id);
});

pool.on('wait', () => {
  console.log('等待连接...');
});

(async () => {
  const conn = await pool.acquire();
  // 使用连接
  pool.release(conn);
})();

2. 流式数据处理

const { Readable } = require('stream');
const EventEmitter = require('events');

// ========== 数据处理器 ==========
class DataProcessor extends EventEmitter {
  constructor() {
    super();
    this.processedCount = 0;
    this.errorCount = 0;
  }

  async process(data) {
    this.emit('start', data);

    try {
      // 处理数据
      const result = await this.transform(data);
      this.processedCount++;
      this.emit('success', result, data);
      return result;
    } catch (err) {
      this.errorCount++;
      this.emit('error', err, data);
      throw err;
    } finally {
      this.emit('end', data);
    }
  }

  async transform(data) {
    // 实际转换逻辑
    return data.toUpperCase();
  }
}

// 使用
const processor = new DataProcessor();

processor.on('start', (data) => console.log('开始处理:', data));
processor.on('success', (result, original) => {
  console.log('处理成功:', original, '->', result);
});
processor.on('error', (err) => console.error('处理失败:', err));
processor.on('end', () => console.log('处理结束'));

processor.process('hello');

3. 插件系统

const EventEmitter = require('events');

// ========== 可扩展的应用框架 ==========
class Application extends EventEmitter {
  constructor() {
    super();
    this.plugins = new Map();
    this.hooks = new Map();
  }

  // 注册插件
  use(plugin, options = {}) {
    if (typeof plugin !== 'function') {
      throw new TypeError('Plugin must be a function');
    }

    const name = plugin.name || `plugin_${this.plugins.size}`;

    if (this.plugins.has(name)) {
      console.warn(`Plugin ${name} already registered`);
      return this;
    }

    // 调用插件,传入app实例
    plugin(this, options);
    this.plugins.set(name, plugin);

    this.emit('plugin:registered', name);
    return this;
  }

  // 注册钩子
  hook(name, handler) {
    if (!this.hooks.has(name)) {
      this.hooks.set(name, []);
    }
    this.hooks.get(name).push(handler);
  }

  // 执行钩子
  async executeHook(name, context) {
    const handlers = this.hooks.get(name) || [];

    for (const handler of handlers) {
      await handler(context);
    }

    this.emit(`hook:${name}`, context);
  }

  async start() {
    this.emit('before:start');
    await this.executeHook('start');
    this.emit('start');
  }
}

// 定义插件
function loggerPlugin(app, options) {
  app.on('start', () => {
    console.log('Application started');
  });

  app.hook('start', async () => {
    console.log('Logger initialized');
  });
}

function databasePlugin(app, options) {
  app.hook('start', async () => {
    console.log('Connecting to database...');
    // 连接数据库
  });
}

// 使用
const app = new Application();
app.use(loggerPlugin);
app.use(databasePlugin, { url: 'mongodb://localhost' });
app.start();

4. 状态机

const EventEmitter = require('events');

// ========== 有限状态机 ==========
class StateMachine extends EventEmitter {
  constructor(initialState) {
    super();
    this.state = initialState;
    this.transitions = new Map();
  }

  // 定义状态转换
  define(fromState, event, toState, action) {
    const key = `${fromState}:${event}`;
    this.transitions.set(key, { toState, action });
  }

  // 触发事件
  async trigger(event, data) {
    const key = `${this.state}:${event}`;
    const transition = this.transitions.get(key);

    if (!transition) {
      throw new Error(`Invalid transition: ${this.state} + ${event}`);
    }

    const { toState, action } = transition;
    const fromState = this.state;

    this.emit('before:transition', { from: fromState, to: toState, event });

    try {
      if (action) {
        await action(data);
      }

      this.state = toState;
      this.emit('transition', { from: fromState, to: toState, event });
      this.emit(`state:${toState}`, data);

      return toState;
    } catch (err) {
      this.emit('transition:error', { from: fromState, event, error: err });
      throw err;
    }
  }

  getState() {
    return this.state;
  }
}

// 使用:订单状态机
const orderFsm = new StateMachine('pending');

// 定义转换
orderFsm.define('pending', 'pay', 'paid', async (data) => {
  console.log('处理支付...');
  // 调用支付接口
});

orderFsm.define('paid', 'ship', 'shipped', async (data) => {
  console.log('发货中...');
  // 调用物流接口
});

orderFsm.define('shipped', 'deliver', 'delivered');

// 监听状态变化
orderFsm.on('transition', ({ from, to }) => {
  console.log(`订单状态: ${from} -> ${to}`);
});

orderFsm.on('state:paid', () => {
  console.log('订单已支付,发送通知');
});

// 执行转换
(async () => {
  await orderFsm.trigger('pay', { amount: 100 });
  await orderFsm.trigger('ship', { tracking: '123456' });
})();

最佳实践

1. 错误处理

const EventEmitter = require('events');

class SafeEmitter extends EventEmitter {
  constructor() {
    super();
    // 必须监听error事件,否则未处理的error会导致崩溃
    this.on('error', (err) => {
      console.error('EventEmitter error:', err);
    });
  }

  safeEmit(event, ...args) {
    try {
      return this.emit(event, ...args);
    } catch (err) {
      this.emit('error', err);
      return false;
    }
  }
}

// 使用
const emitter = new SafeEmitter();

emitter.on('data', (data) => {
  if (!data) {
    throw new Error('Invalid data');
  }
  console.log(data);
});

emitter.safeEmit('data', null); // 不会崩溃

2. 内存管理

const EventEmitter = require('events');

// ========== 避免内存泄漏 ==========
class Resource extends EventEmitter {
  constructor() {
    super();
    this.setMaxListeners(0); // 无限制,但要小心内存泄漏
  }

  // 使用once代替on,如果只需要监听一次
  subscribeOnce(event, handler) {
    this.once(event, handler);
  }

  // 及时移除不需要的监听器
  unsubscribe(event, handler) {
    this.off(event, handler);
  }

  // 销毁时清理所有监听器
  destroy() {
    this.removeAllListeners();
  }
}

// ========== WeakRef实现(高级)==========
// 使用WeakRef避免监听器阻止垃圾回收
class WeakEventEmitter extends EventEmitter {
  on(event, listener) {
    // 包装监听器,使用WeakRef
    const weakRef = new WeakRef(listener);
    const wrapper = (...args) => {
      const fn = weakRef.deref();
      if (fn) {
        fn(...args);
      } else {
        // 原监听器已被回收,移除包装器
        this.off(event, wrapper);
      }
    };

    return super.on(event, wrapper);
  }
}

3. 类型安全(TypeScript)

import { EventEmitter } from 'events';

// 定义事件类型
interface MyEvents {
  'user:login': (user: { id: string; name: string }) => void;
  'user:logout': (userId: string) => void;
  'error': (error: Error) => void;
}

// 类型安全的EventEmitter
class TypedEmitter extends EventEmitter {
  on<K extends keyof MyEvents>(
    event: K,
    listener: MyEvents[K]
  ): this {
    return super.on(event, listener);
  }

  emit<K extends keyof MyEvents>(
    event: K,
    ...args: Parameters<MyEvents[K]>
  ): boolean {
    return super.emit(event, ...args);
  }
}

// 使用
const emitter = new TypedEmitter();

emitter.on('user:login', (user) => {
  console.log(user.name); // 类型安全
});

// emitter.emit('user:login', 'invalid'); // 类型错误!
emitter.emit('user:login', { id: '1', name: 'Alice' }); // 正确

4. 测试

const EventEmitter = require('events');
const assert = require('assert');

// ========== 测试EventEmitter ==========
class MyClass extends EventEmitter {
  doSomething() {
    this.emit('done', 'result');
  }
}

// 测试
describe('MyClass', () => {
  it('should emit done event', (done) => {
    const obj = new MyClass();

    obj.on('done', (result) => {
      assert.strictEqual(result, 'result');
      done();
    });

    obj.doSomething();
  });

  it('should handle multiple listeners', () => {
    const obj = new MyClass();
    let count = 0;

    obj.on('done', () => count++);
    obj.on('done', () => count++);

    obj.doSomething();

    assert.strictEqual(count, 2);
  });
});

面试要点

  1. 核心概念:EventEmitter实现发布-订阅模式,是Node.js事件驱动的基础
  2. 核心方法:on/once(订阅)、emit(发布)、off/removeListener(取消订阅)
  3. 实现原理:内部使用对象存储事件和监听器数组,emit时遍历调用
  4. 继承方式:类继承EventEmitter或混入模式
  5. 注意事项:必须处理error事件、注意监听器数量限制、避免内存泄漏
  6. 应用场景:自定义事件类、流处理、插件系统、状态机

常见追问

  • Q: EventEmitter是同步还是异步的?

    • A: 默认是同步的,监听器按顺序同步执行;可以通过process.nextTick或setImmediate实现异步
  • Q: 为什么需要setMaxListeners?

    • A: 防止内存泄漏,当监听器数量超过限制时发出警告
  • Q: once是如何实现的?

    • A: 包装原始监听器,在执行时先移除自身再调用原始监听器
  • Q: 未处理的error事件会发生什么?

    • A: 会导致程序抛出异常并崩溃,必须始终监听error事件