CQRS (Command Query Responsibility Segregation) and Event Sourcing are two patterns that often get paired together. CQRS splits reads from writes. Event sourcing stores every state change as an immutable event. Combined, you get full audit trails, the ability to reconstruct state at any point in time, and the option to scale reads independently from writes.
The payoff is real in some domains. Financial services report 10x read performance gains. E-commerce teams cut read-side infrastructure costs by 70% on read-heavy workloads. Regulated industries get the auditability they need without bolting it on. The cost is real too: eventual consistency, event versioning headaches, and projection rebuilds that surprise people the first time they need one. With CQRS adoption up 28% in financial services and EventStoreDB 23.0 shipping in October 2025, the tooling has matured to the point where these patterns are usable in production without inventing everything from scratch.
This guide covers when to use CQRS and event sourcing, how to implement them, and the pitfalls that turn well-meaning architectures into over-engineered ones. These patterns sit naturally alongside event-driven architecture with Kafka for distributing events across services, and they pair with database-per-service decisions when each bounded context owns its own store.
How CQRS works
CQRS separates read and write operations into distinct models:
Traditional approach: Single model for reads and writes
// Traditional CRUD - same model for everything
class OrderService {
async createOrder(data: CreateOrderDTO): Promise<Order> {
const order = new Order(data);
await this.repository.save(order);
return order; // Same model used for write and read
}
async getOrder(id: string): Promise<Order> {
return this.repository.findById(id); // Same model
}
async updateOrder(id: string, data: UpdateOrderDTO): Promise<Order> {
const order = await this.repository.findById(id);
order.update(data);
await this.repository.save(order);
return order; // Same model
}
}
CQRS approach: Separate models for commands (writes) and queries (reads)
// Command side - optimized for writes and business logic
class OrderCommandHandler {
async handle(command: CreateOrderCommand): Promise<void> {
// Domain model focused on business rules
const order = Order.create(command);
order.validate();
order.apply(new OrderCreatedEvent(...));
await this.eventStore.save(order);
// No return - commands don't return data
}
}
// Query side - optimized for reads and reporting
class OrderQueryHandler {
async handle(query: GetOrderDetailsQuery): Promise<OrderDetailsDTO> {
// Read model optimized for this specific query
// Could be denormalized, pre-joined, cached
return this.readRepository.getOrderDetails(query.orderId);
}
async handle(query: GetCustomerOrderHistoryQuery): Promise<OrderSummaryDTO[]> {
// Different read model for this query
// Optimized with different indexes, structure
return this.readRepository.getCustomerOrderHistory(query.customerId);
}
}
Why anyone would do this:
- Reads and writes scale separately. Most systems run a 100:1 read-to-write ratio, so this matters.
- The write model can be optimized for business rules. The read models can be optimized for the specific queries the UI actually makes.
- You can use different storage for each side. PostgreSQL for writes, Elasticsearch for full-text search, Redis for hot reads.
- Different teams can own the command and query sides without stepping on each other.
How event sourcing works
Event Sourcing stores every state change as an immutable event:
Traditional approach: Store current state
// Traditional state storage
interface Order {
id: string;
status: OrderStatus; // Only current status
totalAmount: number;
updatedAt: Date;
// Lost history: Who changed status? When? Why?
}
// Update loses history
await db.update('orders', orderId, { status: 'Shipped', updatedAt: new Date() });
// Previous status is gone forever
Event Sourcing approach: Store all events
// Event sourcing - store every state change
interface OrderEvent {
eventId: string;
aggregateId: string;
eventType: string;
data: any;
metadata: {
timestamp: Date;
userId: string;
reason?: string;
};
}
// Events stored immutably
const events: OrderEvent[] = [
{ eventId: '1', aggregateId: 'ord-123', eventType: 'OrderCreated', data: {...}, metadata: {...} },
{ eventId: '2', aggregateId: 'ord-123', eventType: 'OrderPaid', data: {...}, metadata: {...} },
{ eventId: '3', aggregateId: 'ord-123', eventType: 'OrderShipped', data: {...}, metadata: {...} },
// Complete history preserved forever
];
// Rebuild current state by replaying events
function replayEvents(events: OrderEvent[]): Order {
let order = new Order();
for (const event of events) {
order = order.apply(event); // Apply each event
}
return order; // Current state reconstructed
}
// Time travel: Rebuild state at any point in history
function getStateAt(events: OrderEvent[], timestamp: Date): Order {
const pastEvents = events.filter(e => e.metadata.timestamp <= timestamp);
return replayEvents(pastEvents);
}
What you get out of event sourcing:
- A complete audit trail. Every change records who did it, when, and (if you bother to add it to metadata) why.
- The ability to reconstruct state at any point in history. Useful for debugging, useful for legal, useful for analytics.
- Event replay. You can rebuild projections from scratch, retroactively fix bugs by replaying with corrected code, or spin up a new read model that didn't exist before.
- Insight into how entities actually evolved over time, not just their current state.
- Debugging gets much easier because the exact sequence of events that produced the current state is right there.
When this is worth it
Use CQRS and event sourcing when you have one or more of:
Complex business logic with real audit requirements. Financial transactions, healthcare records, legal systems, anything subject to GDPR, SOX, or HIPAA where "we lost the change history" is not an acceptable answer.
High read-to-write ratios with performance requirements. E-commerce platforms running 1000 reads per write, analytics dashboards, reporting systems with complex queries against infrequently updated data.
Multiple read models for the same underlying data. Different views for customer-facing, admin-facing, and analytics. Same data shown many ways. Search requirements that mix full-text, faceted filters, and geo.
Temporal queries. Anything that has to answer "what was the state at this point in time?" Order status on December 1st. Account balance at end of quarter. Active subscriber count each month.
Skip it when you have:
- A simple CRUD app
- Small datasets (under 100K records)
- No real audit requirements
- A team that hasn't worked with the patterns before, and a tight deadline. The learning curve is steep enough to derail timelines.
Implementing event sourcing
The aggregate pattern
// Aggregate Root - business entity that events happen to
export abstract class AggregateRoot {
protected id: string;
protected version: number = 0;
private uncommittedEvents: DomainEvent[] = [];
constructor(id: string) {
this.id = id;
}
// Apply event (mutate state)
protected abstract applyEvent(event: DomainEvent): void;
// Record new event
protected recordEvent(event: DomainEvent): void {
this.applyEvent(event);
this.uncommittedEvents.push(event);
this.version++;
}
// Get events to persist
public getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}
// Mark events as persisted
public markEventsAsCommitted(): void {
this.uncommittedEvents = [];
}
// Replay events to rebuild state
public loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.applyEvent(event);
this.version++;
}
}
public getVersion(): number {
return this.version;
}
}
// Order aggregate
export class Order extends AggregateRoot {
private customerId!: string;
private items: OrderItem[] = [];
private status: OrderStatus = OrderStatus.Pending;
private totalAmount: number = 0;
// Factory method - creates new order
public static create(command: CreateOrderCommand): Order {
const order = new Order(command.orderId);
// Validate business rules
if (command.items.length === 0) {
throw new DomainException('Order must have at least one item');
}
// Record event
order.recordEvent(new OrderCreatedEvent(
command.orderId,
command.customerId,
command.items,
command.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
));
return order;
}
// Command - place order
public place(): void {
// Business rule
if (this.status !== OrderStatus.Pending) {
throw new DomainException('Order can only be placed when pending');
}
// Record event
this.recordEvent(new OrderPlacedEvent(this.id, new Date()));
}
// Command - ship order
public ship(trackingNumber: string): void {
if (this.status !== OrderStatus.Paid) {
throw new DomainException('Order must be paid before shipping');
}
this.recordEvent(new OrderShippedEvent(this.id, trackingNumber, new Date()));
}
// Command - cancel order
public cancel(reason: string): void {
if (![OrderStatus.Pending, OrderStatus.Placed].includes(this.status)) {
throw new DomainException('Cannot cancel order in current status');
}
this.recordEvent(new OrderCancelledEvent(this.id, reason, new Date()));
}
// Apply events to rebuild state
protected applyEvent(event: DomainEvent): void {
if (event instanceof OrderCreatedEvent) {
this.customerId = event.customerId;
this.items = event.items;
this.totalAmount = event.totalAmount;
this.status = OrderStatus.Pending;
} else if (event instanceof OrderPlacedEvent) {
this.status = OrderStatus.Placed;
} else if (event instanceof OrderPaidEvent) {
this.status = OrderStatus.Paid;
} else if (event instanceof OrderShippedEvent) {
this.status = OrderStatus.Shipped;
} else if (event instanceof OrderCancelledEvent) {
this.status = OrderStatus.Cancelled;
}
}
// Getters
public getStatus(): OrderStatus {
return this.status;
}
public getTotalAmount(): number {
return this.totalAmount;
}
}
// Domain events
export interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
aggregateType: string;
timestamp: Date;
metadata: EventMetadata;
}
export interface EventMetadata {
userId?: string;
correlationId?: string;
causationId?: string;
}
export class OrderCreatedEvent implements DomainEvent {
public readonly eventType = 'OrderCreated';
public readonly eventId = uuid();
public readonly aggregateType = 'Order';
public readonly timestamp = new Date();
constructor(
public readonly aggregateId: string,
public readonly customerId: string,
public readonly items: OrderItem[],
public readonly totalAmount: number,
public readonly metadata: EventMetadata = {}
) {}
}
export class OrderPlacedEvent implements DomainEvent {
public readonly eventType = 'OrderPlaced';
public readonly eventId = uuid();
public readonly aggregateType = 'Order';
constructor(
public readonly aggregateId: string,
public readonly timestamp: Date,
public readonly metadata: EventMetadata = {}
) {}
}
Event store
// Event store interface
export interface EventStore {
saveEvents(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
getEvents(aggregateId: string): Promise<DomainEvent[]>;
getEventsSince(aggregateId: string, version: number): Promise<DomainEvent[]>;
getAllEvents(fromPosition?: number, limit?: number): Promise<DomainEvent[]>;
}
// PostgreSQL event store implementation
export class PostgresEventStore implements EventStore {
constructor(private db: Database) {}
async saveEvents(
aggregateId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
// Use transaction for atomicity
await this.db.transaction(async (tx) => {
// Optimistic concurrency check
const currentVersion = await tx.one<{ version: number }>(
'SELECT COALESCE(MAX(version), 0) as version FROM events WHERE aggregate_id = $1',
[aggregateId]
);
if (currentVersion.version !== expectedVersion) {
throw new ConcurrencyException(
`Expected version ${expectedVersion}, but aggregate is at version ${currentVersion.version}`
);
}
// Insert events
for (let i = 0; i < events.length; i++) {
const event = events[i];
await tx.none(
`INSERT INTO events (
event_id, event_type, aggregate_id, aggregate_type, version,
data, metadata, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[
event.eventId,
event.eventType,
event.aggregateId,
event.aggregateType,
expectedVersion + i + 1,
JSON.stringify(event),
JSON.stringify(event.metadata),
event.timestamp
]
);
}
});
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
const rows = await this.db.many(
`SELECT data FROM events
WHERE aggregate_id = $1
ORDER BY version ASC`,
[aggregateId]
);
return rows.map(row => this.deserializeEvent(row.data));
}
async getEventsSince(aggregateId: string, version: number): Promise<DomainEvent[]> {
const rows = await this.db.many(
`SELECT data FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC`,
[aggregateId, version]
);
return rows.map(row => this.deserializeEvent(row.data));
}
async getAllEvents(fromPosition: number = 0, limit: number = 1000): Promise<DomainEvent[]> {
const rows = await this.db.many(
`SELECT data FROM events
WHERE position > $1
ORDER BY position ASC
LIMIT $2`,
[fromPosition, limit]
);
return rows.map(row => this.deserializeEvent(row.data));
}
private deserializeEvent(data: string): DomainEvent {
const event = JSON.parse(data);
// Recreate proper event class instances
return EventFactory.create(event.eventType, event);
}
}
// Event store schema
/*
CREATE TABLE events (
position BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
event_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
version INT NOT NULL,
data JSONB NOT NULL,
metadata JSONB NOT NULL,
timestamp TIMESTAMP NOT NULL,
UNIQUE(aggregate_id, version)
);
CREATE INDEX idx_events_aggregate ON events(aggregate_id);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_timestamp ON events(timestamp);
*/
Snapshotting for performance
// Snapshot to avoid replaying thousands of events
export interface Snapshot {
aggregateId: string;
aggregateType: string;
version: number;
state: any;
timestamp: Date;
}
export class SnapshotStore {
constructor(private db: Database) {}
async saveSnapshot(snapshot: Snapshot): Promise<void> {
await this.db.none(
`INSERT INTO snapshots (aggregate_id, aggregate_type, version, state, timestamp)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (aggregate_id) DO UPDATE SET
version = $3, state = $4, timestamp = $5`,
[
snapshot.aggregateId,
snapshot.aggregateType,
snapshot.version,
JSON.stringify(snapshot.state),
snapshot.timestamp
]
);
}
async getSnapshot(aggregateId: string): Promise<Snapshot | null> {
const row = await this.db.oneOrNone(
'SELECT * FROM snapshots WHERE aggregate_id = $1',
[aggregateId]
);
return row ? {
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
version: row.version,
state: JSON.parse(row.state),
timestamp: row.timestamp
} : null;
}
}
// Repository with snapshotting
export class EventSourcedRepository<T extends AggregateRoot> {
private snapshotFrequency = 100; // Snapshot every 100 events
constructor(
private eventStore: EventStore,
private snapshotStore: SnapshotStore,
private aggregateFactory: (id: string) => T
) {}
async load(aggregateId: string): Promise<T> {
// Try to load from snapshot
const snapshot = await this.snapshotStore.getSnapshot(aggregateId);
let aggregate: T;
let events: DomainEvent[];
if (snapshot) {
// Load from snapshot and replay events since then
aggregate = this.aggregateFactory(aggregateId);
aggregate.loadFromSnapshot(snapshot.state);
events = await this.eventStore.getEventsSince(aggregateId, snapshot.version);
} else {
// No snapshot, replay all events
aggregate = this.aggregateFactory(aggregateId);
events = await this.eventStore.getEvents(aggregateId);
}
// Replay events
aggregate.loadFromHistory(events);
return aggregate;
}
async save(aggregate: T): Promise<void> {
const events = aggregate.getUncommittedEvents();
const expectedVersion = aggregate.getVersion() - events.length;
// Save events
await this.eventStore.saveEvents(aggregate.getId(), events, expectedVersion);
aggregate.markEventsAsCommitted();
// Create snapshot if threshold reached
if (aggregate.getVersion() % this.snapshotFrequency === 0) {
await this.snapshotStore.saveSnapshot({
aggregateId: aggregate.getId(),
aggregateType: aggregate.getType(),
version: aggregate.getVersion(),
state: aggregate.getState(),
timestamp: new Date()
});
}
}
}
CQRS projections
// Projection - builds read models from events
export abstract class Projection {
abstract handles(): string[]; // Event types this projection handles
abstract project(event: DomainEvent): Promise<void>;
}
// Order details projection - optimized for order detail view
export class OrderDetailsProjection extends Projection {
constructor(private db: Database) {
super();
}
handles(): string[] {
return ['OrderCreated', 'OrderPlaced', 'OrderPaid', 'OrderShipped', 'OrderCancelled'];
}
async project(event: DomainEvent): Promise<void> {
if (event instanceof OrderCreatedEvent) {
await this.db.none(
`INSERT INTO order_details_view (
order_id, customer_id, total_amount, status,
items, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.aggregateId,
event.customerId,
event.totalAmount,
'Pending',
JSON.stringify(event.items),
event.timestamp,
event.timestamp
]
);
} else if (event instanceof OrderShippedEvent) {
await this.db.none(
`UPDATE order_details_view
SET status = $1, tracking_number = $2, updated_at = $3
WHERE order_id = $4`,
['Shipped', event.trackingNumber, event.timestamp, event.aggregateId]
);
}
// Handle other events...
}
}
// Customer order history projection - optimized for listing orders
export class CustomerOrderHistoryProjection extends Projection {
constructor(private db: Database) {
super();
}
handles(): string[] {
return ['OrderCreated', 'OrderCancelled'];
}
async project(event: DomainEvent): Promise<void> {
if (event instanceof OrderCreatedEvent) {
await this.db.none(
`INSERT INTO customer_order_history (
customer_id, order_id, order_date, total_amount, status
) VALUES ($1, $2, $3, $4, $5)`,
[event.customerId, event.aggregateId, event.timestamp, event.totalAmount, 'Active']
);
} else if (event instanceof OrderCancelledEvent) {
await this.db.none(
`UPDATE customer_order_history
SET status = $1
WHERE order_id = $2`,
['Cancelled', event.aggregateId]
);
}
}
}
// Projection engine - processes events to update read models
export class ProjectionEngine {
private projections: Map<string, Projection[]> = new Map();
constructor(
private eventStore: EventStore,
private checkpointStore: CheckpointStore
) {}
registerProjection(projection: Projection): void {
const eventTypes = projection.handles();
for (const eventType of eventTypes) {
if (!this.projections.has(eventType)) {
this.projections.set(eventType, []);
}
this.projections.get(eventType)!.push(projection);
}
}
async start(): Promise<void> {
console.log('Starting projection engine...');
// Load last processed position
let position = await this.checkpointStore.getCheckpoint('projections');
while (true) {
// Fetch batch of events
const events = await this.eventStore.getAllEvents(position, 100);
if (events.length === 0) {
// No new events, wait and retry
await this.sleep(1000);
continue;
}
// Process each event
for (const event of events) {
const handlers = this.projections.get(event.eventType) || [];
// Project to all registered projections
await Promise.all(
handlers.map(projection => projection.project(event))
);
position++;
}
// Save checkpoint
await this.checkpointStore.saveCheckpoint('projections', position);
console.log(`Processed ${events.length} events, position: ${position}`);
}
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Checkpoint store - tracks projection progress
export class CheckpointStore {
constructor(private db: Database) {}
async getCheckpoint(name: string): Promise<number> {
const row = await this.db.oneOrNone(
'SELECT position FROM checkpoints WHERE name = $1',
[name]
);
return row?.position || 0;
}
async saveCheckpoint(name: string, position: number): Promise<void> {
await this.db.none(
`INSERT INTO checkpoints (name, position, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (name) DO UPDATE SET
position = $2, updated_at = NOW()`,
[name, position]
);
}
}
Handling eventual consistency
// Problem: Read model may not be immediately updated after command
async function placeOrder() {
// Execute command
await commandBus.execute(new PlaceOrderCommand('ord-123'));
// Try to read immediately
const order = await queryBus.execute(new GetOrderQuery('ord-123'));
// May still show old status! Projection hasn't processed event yet
console.log(order.status); // Might be "Pending" instead of "Placed"
}
// Solution 1: Return version from command, poll until projection catches up
export class CommandResult {
constructor(
public readonly aggregateId: string,
public readonly version: number
) {}
}
async function placeOrderWithConsistency() {
// Command returns version
const result: CommandResult = await commandBus.execute(
new PlaceOrderCommand('ord-123')
);
// Poll until projection has processed this version
const order = await this.pollUntilVersion(
'ord-123',
result.version,
{ maxAttempts: 10, delayMs: 100 }
);
console.log(order.status); // Guaranteed to be "Placed"
}
async function pollUntilVersion(
orderId: string,
expectedVersion: number,
options: { maxAttempts: number; delayMs: number }
): Promise<Order> {
for (let i = 0; i < options.maxAttempts; i++) {
const order = await queryBus.execute(new GetOrderQuery(orderId));
if (order.version >= expectedVersion) {
return order; // Projection caught up
}
await this.sleep(options.delayMs);
}
throw new Error('Projection did not catch up in time');
}
// Solution 2: Use SSE/WebSocket to notify when projection updates
export class ProjectionNotifier {
private subscribers: Map<string, Set<(event: DomainEvent) => void>> = new Map();
subscribe(aggregateId: string, callback: (event: DomainEvent) => void): () => void {
if (!this.subscribers.has(aggregateId)) {
this.subscribers.set(aggregateId, new Set());
}
this.subscribers.get(aggregateId)!.add(callback);
// Return unsubscribe function
return () => {
this.subscribers.get(aggregateId)?.delete(callback);
};
}
notify(event: DomainEvent): void {
const callbacks = this.subscribers.get(event.aggregateId);
if (callbacks) {
callbacks.forEach(callback => callback(event));
}
}
}
// Client-side: Wait for projection update
async function placeOrderWithNotification() {
const orderId = 'ord-123';
// Subscribe to updates
const updates = new EventSource(`/api/orders/${orderId}/updates`);
// Execute command
await commandBus.execute(new PlaceOrderCommand(orderId));
// Wait for projection update event
await new Promise((resolve) => {
updates.addEventListener('OrderProjectionUpdated', (event) => {
resolve(event);
updates.close();
});
});
// Now query is guaranteed to be up-to-date
const order = await queryBus.execute(new GetOrderQuery(orderId));
console.log(order.status); // "Placed"
}
Event versioning
// Problem: Events evolve over time
// Version 1: OrderCreated event
interface OrderCreatedEventV1 {
orderId: string;
customerId: string;
items: OrderItem[];
}
// Version 2: Added total amount (breaking change if not handled)
interface OrderCreatedEventV2 {
orderId: string;
customerId: string;
items: OrderItem[];
totalAmount: number; // New field
}
// Solution: Upcasting - convert old events to new format
export class EventUpcaster {
upcast(event: DomainEvent): DomainEvent {
if (event.eventType === 'OrderCreated') {
if (!event.data.totalAmount) {
// V1 event, upcast to V2
return {
...event,
data: {
...event.data,
totalAmount: event.data.items.reduce(
(sum, item) => sum + item.price * item.quantity,
0
)
},
version: 2
};
}
}
return event;
}
}
// Upcaster in event store
export class EventStoreWithUpcasting {
constructor(
private eventStore: EventStore,
private upcaster: EventUpcaster
) {}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
const events = await this.eventStore.getEvents(aggregateId);
// Upcast all events to latest version
return events.map(event => this.upcaster.upcast(event));
}
}
// Alternative: Keep multiple event handlers
export class Order extends AggregateRoot {
protected applyEvent(event: DomainEvent): void {
if (event instanceof OrderCreatedEventV1) {
this.applyOrderCreatedV1(event);
} else if (event instanceof OrderCreatedEventV2) {
this.applyOrderCreatedV2(event);
}
}
private applyOrderCreatedV1(event: OrderCreatedEventV1): void {
this.customerId = event.customerId;
this.items = event.items;
// Calculate total from items
this.totalAmount = event.items.reduce(
(sum, item) => sum + item.price * item.quantity,
0
);
}
private applyOrderCreatedV2(event: OrderCreatedEventV2): void {
this.customerId = event.customerId;
this.items = event.items;
// Use total from event
this.totalAmount = event.totalAmount;
}
}
Rebuilding projections
// Rebuild projection from scratch (useful for bug fixes or new projections)
export class ProjectionRebuilder {
constructor(
private eventStore: EventStore,
private projection: Projection,
private db: Database
) {}
async rebuild(): Promise<void> {
console.log('Starting projection rebuild...');
// Clear existing data
await this.clearProjection();
let position = 0;
let processed = 0;
while (true) {
// Fetch batch of events
const events = await this.eventStore.getAllEvents(position, 1000);
if (events.length === 0) {
break; // No more events
}
// Process events
for (const event of events) {
if (this.projection.handles().includes(event.eventType)) {
await this.projection.project(event);
processed++;
}
position++;
}
console.log(`Rebuilt ${processed} events, position: ${position}`);
}
console.log('Projection rebuild complete');
}
private async clearProjection(): Promise<void> {
// Clear projection-specific tables
await this.db.none('TRUNCATE TABLE order_details_view');
}
}
Performance numbers
// Comparison: Traditional vs CQRS/ES
interface PerformanceMetrics {
operation: string;
traditional: number; // milliseconds
cqrsES: number;
improvement: string;
}
const metrics: PerformanceMetrics[] = [
{
operation: 'Write operation (insert/update)',
traditional: 50,
cqrsES: 80,
improvement: '-60% (slower due to event storage)'
},
{
operation: 'Simple read (by ID)',
traditional: 20,
cqrsES: 5,
improvement: '4x faster (optimized read model)'
},
{
operation: 'Complex query (joins, aggregations)',
traditional: 500,
cqrsES: 50,
improvement: '10x faster (denormalized read model)'
},
{
operation: 'Audit query (change history)',
traditional: 2000,
cqrsES: 100,
improvement: '20x faster (events stored natively)'
},
{
operation: 'Time travel query',
traditional: 0,
cqrsES: 200,
improvement: 'Impossible vs Possible'
}
];
// Storage comparison
interface StorageComparison {
dataset: string;
traditional: string;
cqrsES: string;
overhead: string;
}
const storage: StorageComparison[] = [
{
dataset: '1M orders',
traditional: '5GB',
cqrsES: '15GB',
overhead: '3x (events + projections)'
},
{
dataset: 'With 5 projections',
traditional: '5GB',
cqrsES: '20GB',
overhead: '4x (multiple read models)'
}
];
// Note: Storage is cheaper than compute
// CQRS/ES trades storage for query performance and auditability
Wrap-up
CQRS and event sourcing work well in domains that genuinely need audit trails, temporal queries, or independent read/write scaling. Financial services get the full auditability regulators want. E-commerce platforms cut read costs by 70%. Regulated industries get compliance baked into the storage model instead of bolted on.
The cost is real. Eventual consistency forces UX choices. Event versioning needs planning before, not after, the first schema change. Projection rebuilds need to be possible without a maintenance window. The biggest mistake teams make is reaching for these patterns by default. Apply them to bounded contexts that need them, not to the whole system.
Start small. Pick one aggregate, implement a basic event store, build one projection, and see if the benefits show up in practice. If audit trails and temporal queries are doing real work, expand from there. If you're never actually using the time-travel queries, the storage and complexity overhead isn't worth it and traditional CRUD is the better answer.
Next steps
- Pick a bounded context that has real audit or temporal-query needs
- Design aggregates with business rules and domain events
- Build the event store on PostgreSQL or EventStoreDB
- Write your first projection optimized for one specific query
- Decide upfront how you'll handle eventual consistency (poll vs notify)
- Plan event versioning before the first schema change, not after
- Monitor projection lag so you know when read models fall behind