Nodejs

Event Emitters: Patterns and Best Practices

Master Node.js EventEmitter patterns for building decoupled, event-driven architectures with proper error handling, memory management, and async integration.

Event Emitters: Patterns and Best Practices

Overview

EventEmitter is the backbone of Node.js. Streams, HTTP servers, child processes, the readline module -- they all extend EventEmitter. Understanding how to use it properly is the difference between a clean, decoupled architecture and a tangled mess of callbacks and shared state. This article covers the core API, real-world patterns for production systems, and the memory pitfalls that catch most teams off guard.

Prerequisites

  • Node.js v16+ installed (v18+ recommended)
  • Familiarity with JavaScript callbacks and basic asynchronous patterns
  • Working knowledge of Express.js (for the decoupling examples)
  • A terminal and a text editor

EventEmitter Fundamentals

The events module ships with Node.js. No npm install required.

var EventEmitter = require('events');

var emitter = new EventEmitter();

emitter.on('data', function(payload) {
    console.log('Received:', payload);
});

emitter.emit('data', { id: 1, name: 'test' });
Received: { id: 1, name: 'test' }

That is the entire API in its simplest form. on() registers a listener. emit() fires the event. Listeners execute synchronously in the order they were registered. This is a critical detail -- if one listener throws, the listeners registered after it never run.

Core API Methods

Here is the full list of methods you will actually use:

var EventEmitter = require('events');
var emitter = new EventEmitter();

// Register a listener
emitter.on('event', handler);

// Register a one-time listener (auto-removes after first call)
emitter.once('event', handler);

// Remove a specific listener
emitter.removeListener('event', handler);
// Alias:
emitter.off('event', handler);

// Remove all listeners for an event (or all events)
emitter.removeAllListeners('event');
emitter.removeAllListeners();

// Emit an event with arguments
emitter.emit('event', arg1, arg2);

// Get listener count
emitter.listenerCount('event');

// Get registered listeners
emitter.listeners('event');

// Prepend a listener (runs before previously registered listeners)
emitter.prependListener('event', handler);
emitter.prependOnceListener('event', handler);

The return value of emit() is a boolean -- true if there were listeners for the event, false if there were none. This is useful for detecting unhandled events.

var handled = emitter.emit('someEvent');
if (!handled) {
    console.warn('No listeners registered for someEvent');
}

Synchronous Execution Order

Listeners are called synchronously and in registration order. This is not a suggestion -- it is a guarantee from the Node.js runtime.

var EventEmitter = require('events');
var emitter = new EventEmitter();

emitter.on('process', function() {
    console.log('First listener');
});

emitter.on('process', function() {
    console.log('Second listener');
});

emitter.on('process', function() {
    console.log('Third listener');
});

console.log('Before emit');
emitter.emit('process');
console.log('After emit');
Before emit
First listener
Second listener
Third listener
After emit

Notice that "After emit" prints last. All three listeners complete before emit() returns. If you need async behavior, you have to build it explicitly.


Listener Management

The once Method

Use once() when you only care about the first occurrence of an event. The listener automatically removes itself after one invocation.

var EventEmitter = require('events');
var emitter = new EventEmitter();

emitter.once('connect', function() {
    console.log('Connected -- this only runs once');
});

emitter.emit('connect'); // prints the message
emitter.emit('connect'); // nothing happens

This is particularly useful for initialization events, one-time setup, or waiting for a resource to become available.

Removing Listeners

To remove a listener, you need a reference to the original function. Anonymous functions cannot be removed individually.

var EventEmitter = require('events');
var emitter = new EventEmitter();

// This works
function onData(data) {
    console.log(data);
}
emitter.on('data', onData);
emitter.removeListener('data', onData);

// This does NOT work
emitter.on('data', function(data) {
    console.log(data);
});
// Cannot remove -- no reference to the anonymous function

This is why I always recommend using named functions for listeners that may need to be removed. Anonymous listeners are fine for fire-and-forget scenarios, but the moment you need lifecycle control, named functions are mandatory.

The maxListeners Warning

By default, EventEmitter warns if you register more than 10 listeners for a single event.

(node:12345) MaxListenersExceededWarning: Possible EventEmitter memory leak detected.
11 data listeners added to [EventEmitter]. Use emitter.setMaxListeners() to increase limit

This warning exists because registering many listeners on the same event is often a symptom of a listener leak -- code that adds listeners in a loop or on every request without ever removing them. Before you blindly increase the limit, check if you actually have a leak.

var EventEmitter = require('events');
var emitter = new EventEmitter();

// Check if you genuinely need more listeners
console.log('Default max:', emitter.getMaxListeners()); // 10

// If you legitimately need more (e.g., a message bus with many subscribers)
emitter.setMaxListeners(50);

// Set globally for all new emitters
EventEmitter.defaultMaxListeners = 20;

// Set to 0 or Infinity to disable the warning entirely (not recommended)
emitter.setMaxListeners(0);

In my experience, if you need more than 10-15 listeners on a single event, your architecture probably needs rethinking. The exception is a centralized event bus used across an entire application, where 20-30 listeners can be legitimate.


Error Handling with Error Events

The error event is special in Node.js. If an error event is emitted and there are no listeners for it, Node.js throws the error and crashes the process.

var EventEmitter = require('events');
var emitter = new EventEmitter();

// This crashes the process
emitter.emit('error', new Error('Something went wrong'));
events.js:292
      throw er; // Unhandled 'error' event
      ^

Error: Something went wrong
    at Object.<anonymous> (/app/test.js:4:29)

Always register an error listener on any EventEmitter you create or consume.

var EventEmitter = require('events');
var emitter = new EventEmitter();

emitter.on('error', function(err) {
    console.error('Emitter error:', err.message);
    // Log it, report it, handle it -- but do not crash
});

emitter.emit('error', new Error('Something went wrong'));
Emitter error: Something went wrong

For subclasses of EventEmitter that you author, consider setting a default error handler in the constructor so consumers are not silently exposed to crash risk.

Capturing Error Context

Always pass Error objects, not strings or plain objects. Error objects carry a stack trace, which is critical for debugging.

// Bad
emitter.emit('error', 'something broke');
emitter.emit('error', { code: 500, message: 'failure' });

// Good
emitter.emit('error', new Error('Connection to database lost'));

// Better -- custom error with context
var err = new Error('Failed to process job');
err.jobId = 'abc-123';
err.retryCount = 3;
emitter.emit('error', err);

Async Event Patterns

EventEmitter listeners are synchronous. But real-world event handlers often need to do async work -- database writes, HTTP calls, file I/O. There are several patterns for handling this cleanly.

Pattern 1: Async Listener with Error Forwarding

var EventEmitter = require('events');
var emitter = new EventEmitter();

emitter.on('user:created', function(user) {
    sendWelcomeEmail(user)
        .then(function() {
            console.log('Welcome email sent to', user.email);
        })
        .catch(function(err) {
            emitter.emit('error', err);
        });
});

function sendWelcomeEmail(user) {
    return new Promise(function(resolve, reject) {
        setTimeout(function() {
            if (!user.email) {
                reject(new Error('No email address for user ' + user.id));
            } else {
                resolve();
            }
        }, 100);
    });
}

Pattern 2: Using events.once() with Promises

Node.js provides a static once() function that returns a Promise. This is the bridge between EventEmitter and async/await-style code.

var events = require('events');
var net = require('net');

function waitForConnection(port) {
    var server = net.createServer();
    server.listen(port);

    return events.once(server, 'listening').then(function() {
        console.log('Server is listening on port', port);
        return server;
    });
}

waitForConnection(3000).then(function(server) {
    console.log('Ready to accept connections');
    server.close();
});
Server is listening on port 3000
Ready to accept connections

The events.once() utility also handles the error case -- if an error event fires before the target event, the returned Promise rejects.

Pattern 3: Event-Driven Pipeline

Chain events to build processing pipelines where each stage emits the next event.

var EventEmitter = require('events');

function createPipeline() {
    var pipeline = new EventEmitter();

    pipeline.on('step:validate', function(data) {
        console.log('Validating...');
        if (!data.name) {
            pipeline.emit('error', new Error('Name is required'));
            return;
        }
        data.validated = true;
        pipeline.emit('step:transform', data);
    });

    pipeline.on('step:transform', function(data) {
        console.log('Transforming...');
        data.name = data.name.toUpperCase();
        data.transformed = true;
        pipeline.emit('step:save', data);
    });

    pipeline.on('step:save', function(data) {
        console.log('Saving...');
        data.saved = true;
        pipeline.emit('step:complete', data);
    });

    pipeline.on('error', function(err) {
        console.error('Pipeline error:', err.message);
    });

    return pipeline;
}

var pipeline = createPipeline();

pipeline.on('step:complete', function(data) {
    console.log('Pipeline complete:', data);
});

pipeline.emit('step:validate', { name: 'Shane' });
Validating...
Transforming...
Saving...
Pipeline complete: { name: 'SHANE', validated: true, transformed: true, saved: true }

Custom Event Emitters

The most powerful pattern is extending EventEmitter to create domain-specific objects that emit events as part of their lifecycle.

var EventEmitter = require('events');
var util = require('util');

function DatabaseConnection(config) {
    EventEmitter.call(this);
    this.config = config;
    this.connected = false;
}

util.inherits(DatabaseConnection, EventEmitter);

DatabaseConnection.prototype.connect = function() {
    var self = this;
    self.emit('connecting', self.config.host);

    // Simulate async connection
    setTimeout(function() {
        self.connected = true;
        self.emit('connected', { host: self.config.host, latency: 23 });
    }, 200);
};

DatabaseConnection.prototype.query = function(sql) {
    var self = this;
    if (!self.connected) {
        self.emit('error', new Error('Not connected'));
        return;
    }

    self.emit('query:start', sql);
    var start = Date.now();

    setTimeout(function() {
        var duration = Date.now() - start;
        var results = [{ id: 1 }, { id: 2 }];
        self.emit('query:complete', { sql: sql, duration: duration, rows: results.length });
    }, 50);
};

DatabaseConnection.prototype.disconnect = function() {
    var self = this;
    self.connected = false;
    self.emit('disconnected');
};

// Usage
var db = new DatabaseConnection({ host: 'localhost', port: 5432 });

db.on('connecting', function(host) { console.log('Connecting to', host + '...'); });
db.on('connected', function(info) { console.log('Connected:', info); });
db.on('query:start', function(sql) { console.log('Executing:', sql); });
db.on('query:complete', function(info) { console.log('Query done:', info); });
db.on('disconnected', function() { console.log('Disconnected'); });
db.on('error', function(err) { console.error('DB Error:', err.message); });

db.connect();
setTimeout(function() {
    db.query('SELECT * FROM users');
}, 300);
setTimeout(function() {
    db.disconnect();
}, 500);
Connecting to localhost...
Connected: { host: 'localhost', latency: 23 }
Executing: SELECT * FROM users
Query done: { sql: 'SELECT * FROM users', duration: 52, rows: 2 }
Disconnected

This pattern gives consumers full visibility into the object's lifecycle without coupling them to the implementation. The database connection does not know or care who is listening.


Real-World Patterns

Pub/Sub Within Express Applications

One of the most effective uses of EventEmitter is decoupling side effects from request handlers in Express applications.

var express = require('express');
var EventEmitter = require('events');

var app = express();
var appEvents = new EventEmitter();
appEvents.setMaxListeners(20);

app.use(express.json());

// --- Event handlers (side effects) ---

appEvents.on('order:placed', function(order) {
    console.log('[Inventory] Reserving stock for order', order.id);
    // inventoryService.reserve(order.items);
});

appEvents.on('order:placed', function(order) {
    console.log('[Email] Sending confirmation to', order.customerEmail);
    // emailService.sendOrderConfirmation(order);
});

appEvents.on('order:placed', function(order) {
    console.log('[Analytics] Tracking order', order.id, '- total:', order.total);
    // analytics.track('order_placed', { orderId: order.id, total: order.total });
});

appEvents.on('order:placed', function(order) {
    console.log('[Notifications] Alerting warehouse for order', order.id);
    // notificationService.alertWarehouse(order);
});

// --- Route handler (clean, focused) ---

app.post('/orders', function(req, res) {
    var order = {
        id: 'ord-' + Date.now(),
        items: req.body.items,
        customerEmail: req.body.email,
        total: req.body.total,
        createdAt: new Date()
    };

    // Save to database
    // db.orders.insert(order);

    // Emit event -- all side effects happen elsewhere
    appEvents.emit('order:placed', order);

    res.status(201).json({ orderId: order.id });
});

app.listen(3000, function() {
    console.log('Server running on port 3000');
});

The route handler does one thing: creates the order and responds. Inventory, email, analytics, and notifications are all separate concerns wired through events. Adding a new side effect means adding a new listener -- the route handler never changes.

Decoupling Business Logic

For larger applications, I structure event handlers as separate modules:

// events/bus.js
var EventEmitter = require('events');
var bus = new EventEmitter();
bus.setMaxListeners(30);
module.exports = bus;

// events/handlers/emailHandler.js
var bus = require('../bus');

bus.on('user:registered', function(user) {
    console.log('Sending welcome email to', user.email);
});

bus.on('user:password-reset', function(user) {
    console.log('Sending password reset email to', user.email);
});

bus.on('order:placed', function(order) {
    console.log('Sending order confirmation to', order.customerEmail);
});

// events/handlers/auditHandler.js
var bus = require('../bus');

bus.on('user:registered', function(user) {
    console.log('Audit log: user registered -', user.id);
});

bus.on('user:login', function(user) {
    console.log('Audit log: user login -', user.id);
});

// app.js
require('./events/handlers/emailHandler');
require('./events/handlers/auditHandler');
var bus = require('./events/bus');

// In your route handlers
bus.emit('user:registered', { id: 'usr-1', email: '[email protected]' });

The key insight is that require() executes the handler modules, which register their listeners on the shared bus. The bus module is a singleton because Node.js caches require() results. Every file that requires ./events/bus gets the same EventEmitter instance.


Memory Leak Detection and Prevention

EventEmitter memory leaks are one of the most common production issues in Node.js applications. They happen when listeners are added repeatedly without being removed.

The Classic Leak Pattern

var EventEmitter = require('events');
var emitter = new EventEmitter();

// Simulating a request handler that adds listeners
function handleRequest(req) {
    // BUG: This adds a new listener on every request
    emitter.on('config:updated', function() {
        console.log('Config updated for request', req.id);
    });
}

// After 11 requests, you get the MaxListenersExceededWarning
for (var i = 0; i < 15; i++) {
    handleRequest({ id: i });
}

console.log('Listener count:', emitter.listenerCount('config:updated'));
(node:54321) MaxListenersExceededWarning: Possible EventEmitter memory leak detected.
11 config:updated listeners added to [EventEmitter]. Use emitter.setMaxListeners() to increase limit
Listener count: 15

The Fix: Listener Lifecycle Management

var EventEmitter = require('events');
var emitter = new EventEmitter();

function handleRequest(req) {
    function onConfigUpdate() {
        console.log('Config updated for request', req.id);
    }

    emitter.on('config:updated', onConfigUpdate);

    // Clean up when the request finishes
    req.on('close', function() {
        emitter.removeListener('config:updated', onConfigUpdate);
    });
}

Using once to Prevent Leaks

If a listener should only fire once, use once() instead of on(). This is especially important in request-scoped code.

var EventEmitter = require('events');
var emitter = new EventEmitter();

function waitForReady() {
    // once() auto-removes -- no leak possible
    emitter.once('ready', function() {
        console.log('System is ready');
    });
}

Detecting Leaks in Production

Use the newListener and removeListener events to monitor listener registration:

var EventEmitter = require('events');
var emitter = new EventEmitter();

emitter.on('newListener', function(eventName) {
    var count = emitter.listenerCount(eventName) + 1;
    if (count > 5) {
        console.warn('WARNING: ' + count + ' listeners for "' + eventName + '"');
        console.trace('Listener added from:');
    }
});

You can also expose listener counts as metrics:

function getEmitterMetrics(emitter) {
    var events = emitter.eventNames();
    var metrics = {};
    for (var i = 0; i < events.length; i++) {
        metrics[events[i]] = emitter.listenerCount(events[i]);
    }
    return metrics;
}

// Check periodically
setInterval(function() {
    var metrics = getEmitterMetrics(appEvents);
    console.log('Event listener counts:', metrics);
}, 30000);

EventEmitter vs Streams vs Observables

These three patterns overlap but serve different purposes. Knowing when to use each is important.

EventEmitter

Use when you need to signal lifecycle events, state changes, or wire up decoupled components. Events are fire-and-forget -- emitters do not wait for listeners to finish and do not handle backpressure.

Best for: Application events, pub/sub, object lifecycles, decoupling modules.

Streams

Streams extend EventEmitter but add flow control. They handle backpressure -- when the consumer is slower than the producer, streams automatically pause the producer. Use streams when you are moving data from point A to point B.

Best for: File I/O, HTTP request/response bodies, data transformation pipelines, anything involving continuous data flow.

var fs = require('fs');
var stream = fs.createReadStream('/tmp/large-file.csv');

// Streams ARE event emitters
stream.on('data', function(chunk) { /* ... */ });
stream.on('end', function() { /* ... */ });
stream.on('error', function(err) { /* ... */ });

Observables (RxJS)

Observables are not built into Node.js -- they come from libraries like RxJS. They add operators for filtering, mapping, debouncing, and combining event streams. Use them when you need complex event composition.

Best for: Complex UI event handling, real-time data processing with operators, combining multiple async streams.

// If you need debounce, throttle, merge, switchMap --
// Observables are the right tool. But for most Node.js
// server-side work, EventEmitter is sufficient.

My rule of thumb: start with EventEmitter. Move to streams if you need backpressure. Bring in RxJS only if you need composition operators and cannot achieve the same result with a few well-placed listeners.


Complete Working Example: Job Processing System

Here is a complete job processing system that uses EventEmitter for lifecycle management. It includes logging, metrics collection, and retry logic -- all wired through events.

var EventEmitter = require('events');
var util = require('util');

// --- Job Queue ---

function JobQueue(options) {
    EventEmitter.call(this);
    this.name = options.name || 'default';
    this.concurrency = options.concurrency || 3;
    this.maxRetries = options.maxRetries || 3;
    this.retryDelay = options.retryDelay || 1000;
    this.queue = [];
    this.active = 0;
    this.processed = 0;
    this.failed = 0;
    this.metrics = {
        totalProcessed: 0,
        totalFailed: 0,
        totalRetried: 0,
        avgDuration: 0,
        durations: []
    };
}

util.inherits(JobQueue, EventEmitter);

JobQueue.prototype.add = function(jobData) {
    var job = {
        id: 'job-' + Date.now() + '-' + Math.random().toString(36).substr(2, 6),
        data: jobData,
        attempts: 0,
        maxRetries: this.maxRetries,
        createdAt: new Date(),
        status: 'pending'
    };

    this.queue.push(job);
    this.emit('job:created', job);
    this._processNext();
    return job;
};

JobQueue.prototype._processNext = function() {
    var self = this;

    while (self.active < self.concurrency && self.queue.length > 0) {
        var job = self.queue.shift();
        self.active++;
        self._executeJob(job);
    }
};

JobQueue.prototype._executeJob = function(job) {
    var self = this;
    job.status = 'running';
    job.attempts++;
    job.startedAt = new Date();

    self.emit('job:started', job);

    // Simulate progress updates
    var progress = 0;
    var progressInterval = setInterval(function() {
        progress += Math.floor(Math.random() * 30) + 10;
        if (progress > 100) progress = 100;
        self.emit('job:progress', job, progress);
        if (progress >= 100) {
            clearInterval(progressInterval);
        }
    }, 200);

    // Simulate job execution
    self._runJobHandler(job, function(err) {
        clearInterval(progressInterval);
        job.completedAt = new Date();
        var duration = job.completedAt - job.startedAt;

        if (err) {
            job.status = 'failed';
            job.lastError = err;
            self.emit('job:failed', job, err);

            // Retry logic
            if (job.attempts < job.maxRetries) {
                self.metrics.totalRetried++;
                self.emit('job:retry', job, job.attempts);

                setTimeout(function() {
                    self.queue.unshift(job);
                    self.active--;
                    self._processNext();
                }, self.retryDelay * job.attempts);
            } else {
                self.metrics.totalFailed++;
                self.active--;
                self.emit('job:exhausted', job);
                self._processNext();
            }
        } else {
            job.status = 'completed';
            self.metrics.totalProcessed++;
            self.metrics.durations.push(duration);
            self.metrics.avgDuration = self.metrics.durations.reduce(function(a, b) { return a + b; }, 0) / self.metrics.durations.length;

            self.emit('job:completed', job, duration);
            self.active--;
            self._processNext();
        }
    });
};

JobQueue.prototype._runJobHandler = function(job, callback) {
    // Simulate work with random failures
    setTimeout(function() {
        var shouldFail = Math.random() < 0.3; // 30% failure rate
        if (shouldFail) {
            callback(new Error('Processing failed for job ' + job.id + ': timeout'));
        } else {
            callback(null);
        }
    }, Math.floor(Math.random() * 500) + 200);
};

JobQueue.prototype.getMetrics = function() {
    return {
        name: this.name,
        queued: this.queue.length,
        active: this.active,
        totalProcessed: this.metrics.totalProcessed,
        totalFailed: this.metrics.totalFailed,
        totalRetried: this.metrics.totalRetried,
        avgDuration: Math.round(this.metrics.avgDuration) + 'ms'
    };
};

// --- Logger Plugin ---

function attachLogger(queue) {
    queue.on('job:created', function(job) {
        console.log('[' + timestamp() + '] [LOG] Job created: ' + job.id + ' - data:', JSON.stringify(job.data));
    });

    queue.on('job:started', function(job) {
        console.log('[' + timestamp() + '] [LOG] Job started: ' + job.id + ' (attempt ' + job.attempts + ')');
    });

    queue.on('job:progress', function(job, progress) {
        console.log('[' + timestamp() + '] [LOG] Job ' + job.id + ' progress: ' + progress + '%');
    });

    queue.on('job:completed', function(job, duration) {
        console.log('[' + timestamp() + '] [LOG] Job completed: ' + job.id + ' in ' + duration + 'ms');
    });

    queue.on('job:failed', function(job, err) {
        console.error('[' + timestamp() + '] [LOG] Job failed: ' + job.id + ' - ' + err.message);
    });

    queue.on('job:retry', function(job, attempt) {
        console.warn('[' + timestamp() + '] [LOG] Job retrying: ' + job.id + ' (attempt ' + attempt + ' of ' + job.maxRetries + ')');
    });

    queue.on('job:exhausted', function(job) {
        console.error('[' + timestamp() + '] [LOG] Job exhausted all retries: ' + job.id);
    });
}

// --- Metrics Collector Plugin ---

function attachMetrics(queue) {
    var counts = {
        created: 0,
        started: 0,
        completed: 0,
        failed: 0,
        retried: 0
    };

    queue.on('job:created', function() { counts.created++; });
    queue.on('job:started', function() { counts.started++; });
    queue.on('job:completed', function() { counts.completed++; });
    queue.on('job:failed', function() { counts.failed++; });
    queue.on('job:retry', function() { counts.retried++; });

    queue.on('error', function(err) {
        console.error('[METRICS] Unhandled queue error:', err.message);
    });

    // Expose metrics endpoint
    return {
        getCounts: function() { return Object.assign({}, counts); },
        reset: function() {
            counts.created = 0;
            counts.started = 0;
            counts.completed = 0;
            counts.failed = 0;
            counts.retried = 0;
        }
    };
}

// --- Alert Plugin ---

function attachAlerts(queue, threshold) {
    threshold = threshold || 3;
    var consecutiveFailures = 0;

    queue.on('job:completed', function() {
        consecutiveFailures = 0;
    });

    queue.on('job:exhausted', function(job) {
        consecutiveFailures++;
        if (consecutiveFailures >= threshold) {
            console.error('[ALERT] ' + consecutiveFailures + ' consecutive job failures! Last job: ' + job.id);
            console.error('[ALERT] Last error: ' + (job.lastError ? job.lastError.message : 'unknown'));
            // In production: send PagerDuty alert, Slack notification, etc.
        }
    });
}

// --- Utility ---

function timestamp() {
    return new Date().toISOString().substr(11, 12);
}

// --- Run the system ---

var queue = new JobQueue({
    name: 'email-processor',
    concurrency: 2,
    maxRetries: 3,
    retryDelay: 500
});

// Wire up plugins via events
attachLogger(queue);
var metrics = attachMetrics(queue);
attachAlerts(queue, 3);

// Add some jobs
queue.add({ type: 'welcome-email', to: '[email protected]' });
queue.add({ type: 'receipt', to: '[email protected]', orderId: 'ord-001' });
queue.add({ type: 'newsletter', to: '[email protected]' });
queue.add({ type: 'password-reset', to: '[email protected]' });
queue.add({ type: 'welcome-email', to: '[email protected]' });

// Print metrics after processing
setTimeout(function() {
    console.log('\n--- Final Metrics ---');
    console.log('Queue metrics:', queue.getMetrics());
    console.log('Event counts:', metrics.getCounts());
}, 8000);

Sample output:

[14:23:01.123] [LOG] Job created: job-1707401581123-x8k2m1 - data: {"type":"welcome-email","to":"[email protected]"}
[14:23:01.124] [LOG] Job created: job-1707401581124-p3n7q9 - data: {"type":"receipt","to":"[email protected]","orderId":"ord-001"}
[14:23:01.124] [LOG] Job started: job-1707401581123-x8k2m1 (attempt 1)
[14:23:01.124] [LOG] Job started: job-1707401581124-p3n7q9 (attempt 1)
[14:23:01.125] [LOG] Job created: job-1707401581125-r2t4v6 - data: {"type":"newsletter","to":"[email protected]"}
[14:23:01.125] [LOG] Job created: job-1707401581125-w5y7a9 - data: {"type":"password-reset","to":"[email protected]"}
[14:23:01.126] [LOG] Job created: job-1707401581126-b8d0f2 - data: {"type":"welcome-email","to":"[email protected]"}
[14:23:01.325] [LOG] Job job-1707401581123-x8k2m1 progress: 28%
[14:23:01.325] [LOG] Job job-1707401581124-p3n7q9 progress: 15%
[14:23:01.451] [LOG] Job completed: job-1707401581123-x8k2m1 in 327ms
[14:23:01.451] [LOG] Job started: job-1707401581125-r2t4v6 (attempt 1)
[14:23:01.525] [LOG] Job job-1707401581124-p3n7q9 progress: 52%
[14:23:01.612] [LOG] Job failed: job-1707401581124-p3n7q9 - Processing failed for job job-1707401581124-p3n7q9: timeout
[14:23:01.612] [LOG] Job retrying: job-1707401581124-p3n7q9 (attempt 1 of 3)
[14:23:01.725] [LOG] Job job-1707401581125-r2t4v6 progress: 41%
[14:23:01.892] [LOG] Job completed: job-1707401581125-r2t4v6 in 441ms
[14:23:02.112] [LOG] Job started: job-1707401581124-p3n7q9 (attempt 2)
[14:23:02.450] [LOG] Job completed: job-1707401581124-p3n7q9 in 338ms

--- Final Metrics ---
Queue metrics: { name: 'email-processor', queued: 0, active: 0, totalProcessed: 5, totalFailed: 0, totalRetried: 1, avgDuration: '362ms' }
Event counts: { created: 5, started: 6, completed: 5, failed: 1, retried: 1 }

The power of this pattern is the plugin architecture. The JobQueue knows nothing about logging, metrics, or alerting. Each plugin is a self-contained module that hooks into the lifecycle through events. You can add, remove, or replace plugins without touching the core queue code.


Common Issues & Troubleshooting

1. MaxListenersExceededWarning in Production

(node:8872) MaxListenersExceededWarning: Possible EventEmitter memory leak detected.
11 request listeners added to [Server]. Use emitter.setMaxListeners() to increase limit

Cause: Listeners are being added inside a loop, inside a request handler, or inside a recurring callback without being removed. This is the most common EventEmitter bug.

Fix: Audit where on() is called. If it is inside a function that runs more than once, ensure there is a corresponding removeListener() or use once() instead.

// Find the leak by tracking listener registration
process.on('warning', function(warning) {
    if (warning.name === 'MaxListenersExceededWarning') {
        console.error(warning.stack);
    }
});

2. Unhandled Error Event Crashes Process

events.js:292
      throw er; // Unhandled 'error' event
      ^

Error: ECONNREFUSED 127.0.0.1:5432

Cause: An error event was emitted on an EventEmitter with no error listener. Node.js treats this as a fatal exception.

Fix: Always attach an error listener to every EventEmitter you create. For safety, also attach a global handler:

process.on('uncaughtException', function(err) {
    console.error('Uncaught exception:', err);
    process.exit(1);
});

3. Listener Executes Multiple Times Unexpectedly

Processing order: ord-001
Processing order: ord-001
Processing order: ord-001
Processing order: ord-001

Cause: The listener was registered multiple times. This often happens when on() is called inside a function that executes on each request or each connection.

Fix: Check your registration code. Use listenerCount() to verify:

console.log('Listeners for order:placed:', emitter.listenerCount('order:placed'));
// If this grows over time, you have a registration leak

Alternatively, guard against duplicate registration:

function safeOn(emitter, event, handler) {
    var existing = emitter.listeners(event);
    for (var i = 0; i < existing.length; i++) {
        if (existing[i] === handler) {
            return; // Already registered
        }
    }
    emitter.on(event, handler);
}

4. Async Errors Silently Swallowed

emitter.on('data', function(item) {
    processAsync(item); // Returns a promise
    // If processAsync rejects, nobody catches it
});
(node:9012) UnhandledPromiseRejectionWarning: Error: Database write failed

Cause: An async function inside a synchronous listener throws or rejects, but there is no .catch() or try/catch to handle it.

Fix: Always handle promise rejections inside event listeners:

emitter.on('data', function(item) {
    processAsync(item).catch(function(err) {
        emitter.emit('error', err);
    });
});

5. Events Fire Before Listeners Are Registered

var emitter = createConnection();
emitter.emit('ready'); // Fires immediately

// This listener never gets called
emitter.on('ready', function() {
    console.log('Connected!');
});

Cause: If the event is emitted synchronously in the constructor or factory function, listeners added after creation will miss it.

Fix: Defer the initial emit using process.nextTick():

function createConnection() {
    var emitter = new EventEmitter();

    // Defer so consumers can attach listeners first
    process.nextTick(function() {
        emitter.emit('ready');
    });

    return emitter;
}

Best Practices

  • Always register an error listener. Every EventEmitter you create or consume should have an error event handler. Unhandled error events crash the process. No exceptions.

  • Use named functions for removable listeners. If you will ever need to call removeListener(), the listener must be a named function you hold a reference to. Anonymous inline functions are impossible to remove individually.

  • Prefer once() for single-use listeners. Initialization events, connection events, and one-time responses should use once() to prevent listener accumulation. This eliminates an entire class of memory leaks.

  • Keep listeners fast. Listeners execute synchronously and block the emit call. If a listener does heavy computation, it delays every listener registered after it. Move CPU-intensive work to setImmediate() or a worker thread.

  • Use namespaced event names. Prefix events with their domain: user:created, order:placed, job:failed. This prevents collisions on shared emitters and makes the codebase searchable.

  • Do not increase maxListeners blindly. The warning exists for a reason. If you are hitting the limit, investigate whether you have a listener leak before calling setMaxListeners(). Ninety percent of the time, the warning is catching a real bug.

  • Emit Error objects, not strings. Always emit proper Error instances so the stack trace is preserved. Attach context properties to the error object (err.jobId, err.userId) rather than using string concatenation.

  • Defer events from constructors with process.nextTick(). If your constructor or factory function emits an event, wrap it in process.nextTick() so that callers have time to attach listeners before the event fires.

  • Monitor listener counts in production. Expose listenerCount() values as application metrics. A steadily growing listener count on any event is a memory leak in progress.

  • Document your events. If you publish a module or library that extends EventEmitter, list every event it emits, the arguments it passes, and when each event fires. Consumers should not have to read your source code to use your events.


References

Powered by Contentful