CQRS and Event Sourcing: Practical Implementation Beyond the Hype

Implement CQRS and Event Sourcing in production. Complete guide with code examples, event store design, projections, and real-world patterns for complex domains.

17 minutes
Advanced
2025-10-17

CQRS (Command Query Responsibility Segregation) and Event Sourcing represent two powerful architectural patterns that, when combined, enable building systems with complete audit trails, time-travel debugging, and independent scaling of reads and writes. Financial services report 10x read performance improvements, e-commerce platforms achieve 70% cost reduction on read-heavy workloads, and regulated industries gain complete auditability. However, these patterns introduce complexity—eventual consistency, event versioning, and projection rebuilds. With CQRS adoption up 28% in financial services and EventStoreDB 23.0 release in October 2025, the patterns are maturing beyond the hype into practical, production-ready solutions.

This comprehensive guide shows you when to use CQRS and Event Sourcing, how to implement them correctly, and how to avoid common pitfalls that lead to over-engineered systems.

Understanding CQRS

CQRS separates read and write operations into distinct models:

Traditional approach: Single model for reads and writes

// Traditional CRUD - same model for everything
class OrderService {
  async createOrder(data: CreateOrderDTO): Promise<Order> {
    const order = new Order(data);
    await this.repository.save(order);
    return order;  // Same model used for write and read
  }

  async getOrder(id: string): Promise<Order> {
    return this.repository.findById(id);  // Same model
  }

  async updateOrder(id: string, data: UpdateOrderDTO): Promise<Order> {
    const order = await this.repository.findById(id);
    order.update(data);
    await this.repository.save(order);
    return order;  // Same model
  }
}

CQRS approach: Separate models for commands (writes) and queries (reads)

// Command side - optimized for writes and business logic
class OrderCommandHandler {
  async handle(command: CreateOrderCommand): Promise<void> {
    // Domain model focused on business rules
    const order = Order.create(command);
    order.validate();
    order.apply(new OrderCreatedEvent(...));
    
    await this.eventStore.save(order);
    // No return - commands don't return data
  }
}

// Query side - optimized for reads and reporting
class OrderQueryHandler {
  async handle(query: GetOrderDetailsQuery): Promise<OrderDetailsDTO> {
    // Read model optimized for this specific query
    // Could be denormalized, pre-joined, cached
    return this.readRepository.getOrderDetails(query.orderId);
  }

  async handle(query: GetCustomerOrderHistoryQuery): Promise<OrderSummaryDTO[]> {
    // Different read model for this query
    // Optimized with different indexes, structure
    return this.readRepository.getCustomerOrderHistory(query.customerId);
  }
}

Benefits of CQRS:

  • Independent scaling: Scale reads and writes separately (often 100:1 read:write ratio)
  • Optimized models: Write model for business logic, read models for specific queries
  • Technology flexibility: Use PostgreSQL for writes, Elasticsearch for full-text search, Redis for caching
  • Team independence: Different teams can work on command and query sides

Understanding Event Sourcing

Event Sourcing stores every state change as an immutable event:

Traditional approach: Store current state

// Traditional state storage
interface Order {
  id: string;
  status: OrderStatus;  // Only current status
  totalAmount: number;
  updatedAt: Date;
  // Lost history: Who changed status? When? Why?
}

// Update loses history
await db.update('orders', orderId, { status: 'Shipped', updatedAt: new Date() });
// Previous status is gone forever

Event Sourcing approach: Store all events

// Event sourcing - store every state change
interface OrderEvent {
  eventId: string;
  aggregateId: string;
  eventType: string;
  data: any;
  metadata: {
    timestamp: Date;
    userId: string;
    reason?: string;
  };
}

// Events stored immutably
const events: OrderEvent[] = [
  { eventId: '1', aggregateId: 'ord-123', eventType: 'OrderCreated', data: {...}, metadata: {...} },
  { eventId: '2', aggregateId: 'ord-123', eventType: 'OrderPaid', data: {...}, metadata: {...} },
  { eventId: '3', aggregateId: 'ord-123', eventType: 'OrderShipped', data: {...}, metadata: {...} },
  // Complete history preserved forever
];

// Rebuild current state by replaying events
function replayEvents(events: OrderEvent[]): Order {
  let order = new Order();
  
  for (const event of events) {
    order = order.apply(event);  // Apply each event
  }
  
  return order;  // Current state reconstructed
}

// Time travel: Rebuild state at any point in history
function getStateAt(events: OrderEvent[], timestamp: Date): Order {
  const pastEvents = events.filter(e => e.metadata.timestamp <= timestamp);
  return replayEvents(pastEvents);
}

Benefits of Event Sourcing:

  • Complete audit trail: Every change recorded with who, when, why
  • Time travel: Reconstruct state at any point in history
  • Event replay: Rebuild projections, fix bugs retroactively
  • Business intelligence: Analyze how entities evolved over time
  • Debugging: See exact sequence of events that led to current state

When to Use CQRS and Event Sourcing

Use When:

1. Complex Business Logic with Audit Requirements

  • Financial transactions (banking, trading, payments)
  • Healthcare records (complete patient history required)
  • Legal systems (every change must be auditable)
  • Regulatory compliance (GDPR, SOX, HIPAA)

2. High Read/Write Ratio with Performance Requirements

  • E-commerce (1000 reads per 1 write)
  • Analytics dashboards (heavy reads, light writes)
  • Reporting systems (complex queries, infrequent updates)

3. Multiple Read Models Needed

  • Different views for different users (customer, admin, analytics)
  • Same data displayed in many formats
  • Complex search requirements (full-text, faceted, geo)

4. Temporal Queries Required

  • "What was the order status on December 1st?"
  • "Show me the account balance as of end of quarter"
  • "How many active subscribers did we have each month?"

Don't Use When:

  • Simple CRUD applications
  • Small datasets (< 100K records)
  • No audit requirements
  • Team unfamiliar with patterns (steep learning curve)
  • Tight deadlines (adds complexity)

Implementing Event Sourcing

Aggregate Pattern

// Aggregate Root - business entity that events happen to
export abstract class AggregateRoot {
  protected id: string;
  protected version: number = 0;
  private uncommittedEvents: DomainEvent[] = [];

  constructor(id: string) {
    this.id = id;
  }

  // Apply event (mutate state)
  protected abstract applyEvent(event: DomainEvent): void;

  // Record new event
  protected recordEvent(event: DomainEvent): void {
    this.applyEvent(event);
    this.uncommittedEvents.push(event);
    this.version++;
  }

  // Get events to persist
  public getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents];
  }

  // Mark events as persisted
  public markEventsAsCommitted(): void {
    this.uncommittedEvents = [];
  }

  // Replay events to rebuild state
  public loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.applyEvent(event);
      this.version++;
    }
  }

  public getVersion(): number {
    return this.version;
  }
}

// Order aggregate
export class Order extends AggregateRoot {
  private customerId!: string;
  private items: OrderItem[] = [];
  private status: OrderStatus = OrderStatus.Pending;
  private totalAmount: number = 0;

  // Factory method - creates new order
  public static create(command: CreateOrderCommand): Order {
    const order = new Order(command.orderId);
    
    // Validate business rules
    if (command.items.length === 0) {
      throw new DomainException('Order must have at least one item');
    }

    // Record event
    order.recordEvent(new OrderCreatedEvent(
      command.orderId,
      command.customerId,
      command.items,
      command.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
    ));

    return order;
  }

  // Command - place order
  public place(): void {
    // Business rule
    if (this.status !== OrderStatus.Pending) {
      throw new DomainException('Order can only be placed when pending');
    }

    // Record event
    this.recordEvent(new OrderPlacedEvent(this.id, new Date()));
  }

  // Command - ship order
  public ship(trackingNumber: string): void {
    if (this.status !== OrderStatus.Paid) {
      throw new DomainException('Order must be paid before shipping');
    }

    this.recordEvent(new OrderShippedEvent(this.id, trackingNumber, new Date()));
  }

  // Command - cancel order
  public cancel(reason: string): void {
    if (![OrderStatus.Pending, OrderStatus.Placed].includes(this.status)) {
      throw new DomainException('Cannot cancel order in current status');
    }

    this.recordEvent(new OrderCancelledEvent(this.id, reason, new Date()));
  }

  // Apply events to rebuild state
  protected applyEvent(event: DomainEvent): void {
    if (event instanceof OrderCreatedEvent) {
      this.customerId = event.customerId;
      this.items = event.items;
      this.totalAmount = event.totalAmount;
      this.status = OrderStatus.Pending;
    } else if (event instanceof OrderPlacedEvent) {
      this.status = OrderStatus.Placed;
    } else if (event instanceof OrderPaidEvent) {
      this.status = OrderStatus.Paid;
    } else if (event instanceof OrderShippedEvent) {
      this.status = OrderStatus.Shipped;
    } else if (event instanceof OrderCancelledEvent) {
      this.status = OrderStatus.Cancelled;
    }
  }

  // Getters
  public getStatus(): OrderStatus {
    return this.status;
  }

  public getTotalAmount(): number {
    return this.totalAmount;
  }
}

// Domain events
export interface DomainEvent {
  eventId: string;
  eventType: string;
  aggregateId: string;
  aggregateType: string;
  timestamp: Date;
  metadata: EventMetadata;
}

export interface EventMetadata {
  userId?: string;
  correlationId?: string;
  causationId?: string;
}

export class OrderCreatedEvent implements DomainEvent {
  public readonly eventType = 'OrderCreated';
  public readonly eventId = uuid();
  public readonly aggregateType = 'Order';
  public readonly timestamp = new Date();

  constructor(
    public readonly aggregateId: string,
    public readonly customerId: string,
    public readonly items: OrderItem[],
    public readonly totalAmount: number,
    public readonly metadata: EventMetadata = {}
  ) {}
}

export class OrderPlacedEvent implements DomainEvent {
  public readonly eventType = 'OrderPlaced';
  public readonly eventId = uuid();
  public readonly aggregateType = 'Order';
  
  constructor(
    public readonly aggregateId: string,
    public readonly timestamp: Date,
    public readonly metadata: EventMetadata = {}
  ) {}
}

Event Store Implementation

// Event store interface
export interface EventStore {
  saveEvents(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
  getEvents(aggregateId: string): Promise<DomainEvent[]>;
  getEventsSince(aggregateId: string, version: number): Promise<DomainEvent[]>;
  getAllEvents(fromPosition?: number, limit?: number): Promise<DomainEvent[]>;
}

// PostgreSQL event store implementation
export class PostgresEventStore implements EventStore {
  constructor(private db: Database) {}

  async saveEvents(
    aggregateId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    // Use transaction for atomicity
    await this.db.transaction(async (tx) => {
      // Optimistic concurrency check
      const currentVersion = await tx.one<{ version: number }>(
        'SELECT COALESCE(MAX(version), 0) as version FROM events WHERE aggregate_id = $1',
        [aggregateId]
      );

      if (currentVersion.version !== expectedVersion) {
        throw new ConcurrencyException(
          `Expected version ${expectedVersion}, but aggregate is at version ${currentVersion.version}`
        );
      }

      // Insert events
      for (let i = 0; i < events.length; i++) {
        const event = events[i];
        await tx.none(
          `INSERT INTO events (
            event_id, event_type, aggregate_id, aggregate_type, version, 
            data, metadata, timestamp
          ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
          [
            event.eventId,
            event.eventType,
            event.aggregateId,
            event.aggregateType,
            expectedVersion + i + 1,
            JSON.stringify(event),
            JSON.stringify(event.metadata),
            event.timestamp
          ]
        );
      }
    });
  }

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    const rows = await this.db.many(
      `SELECT data FROM events 
       WHERE aggregate_id = $1 
       ORDER BY version ASC`,
      [aggregateId]
    );

    return rows.map(row => this.deserializeEvent(row.data));
  }

  async getEventsSince(aggregateId: string, version: number): Promise<DomainEvent[]> {
    const rows = await this.db.many(
      `SELECT data FROM events 
       WHERE aggregate_id = $1 AND version > $2
       ORDER BY version ASC`,
      [aggregateId, version]
    );

    return rows.map(row => this.deserializeEvent(row.data));
  }

  async getAllEvents(fromPosition: number = 0, limit: number = 1000): Promise<DomainEvent[]> {
    const rows = await this.db.many(
      `SELECT data FROM events 
       WHERE position > $1
       ORDER BY position ASC
       LIMIT $2`,
      [fromPosition, limit]
    );

    return rows.map(row => this.deserializeEvent(row.data));
  }

  private deserializeEvent(data: string): DomainEvent {
    const event = JSON.parse(data);
    // Recreate proper event class instances
    return EventFactory.create(event.eventType, event);
  }
}

// Event store schema
/*
CREATE TABLE events (
  position BIGSERIAL PRIMARY KEY,
  event_id UUID NOT NULL UNIQUE,
  event_type VARCHAR(255) NOT NULL,
  aggregate_id VARCHAR(255) NOT NULL,
  aggregate_type VARCHAR(255) NOT NULL,
  version INT NOT NULL,
  data JSONB NOT NULL,
  metadata JSONB NOT NULL,
  timestamp TIMESTAMP NOT NULL,
  UNIQUE(aggregate_id, version)
);

CREATE INDEX idx_events_aggregate ON events(aggregate_id);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_timestamp ON events(timestamp);
*/

Snapshotting for Performance

// Snapshot to avoid replaying thousands of events
export interface Snapshot {
  aggregateId: string;
  aggregateType: string;
  version: number;
  state: any;
  timestamp: Date;
}

export class SnapshotStore {
  constructor(private db: Database) {}

  async saveSnapshot(snapshot: Snapshot): Promise<void> {
    await this.db.none(
      `INSERT INTO snapshots (aggregate_id, aggregate_type, version, state, timestamp)
       VALUES ($1, $2, $3, $4, $5)
       ON CONFLICT (aggregate_id) DO UPDATE SET
         version = $3, state = $4, timestamp = $5`,
      [
        snapshot.aggregateId,
        snapshot.aggregateType,
        snapshot.version,
        JSON.stringify(snapshot.state),
        snapshot.timestamp
      ]
    );
  }

  async getSnapshot(aggregateId: string): Promise<Snapshot | null> {
    const row = await this.db.oneOrNone(
      'SELECT * FROM snapshots WHERE aggregate_id = $1',
      [aggregateId]
    );

    return row ? {
      aggregateId: row.aggregate_id,
      aggregateType: row.aggregate_type,
      version: row.version,
      state: JSON.parse(row.state),
      timestamp: row.timestamp
    } : null;
  }
}

// Repository with snapshotting
export class EventSourcedRepository<T extends AggregateRoot> {
  private snapshotFrequency = 100;  // Snapshot every 100 events

  constructor(
    private eventStore: EventStore,
    private snapshotStore: SnapshotStore,
    private aggregateFactory: (id: string) => T
  ) {}

  async load(aggregateId: string): Promise<T> {
    // Try to load from snapshot
    const snapshot = await this.snapshotStore.getSnapshot(aggregateId);
    
    let aggregate: T;
    let events: DomainEvent[];

    if (snapshot) {
      // Load from snapshot and replay events since then
      aggregate = this.aggregateFactory(aggregateId);
      aggregate.loadFromSnapshot(snapshot.state);
      events = await this.eventStore.getEventsSince(aggregateId, snapshot.version);
    } else {
      // No snapshot, replay all events
      aggregate = this.aggregateFactory(aggregateId);
      events = await this.eventStore.getEvents(aggregateId);
    }

    // Replay events
    aggregate.loadFromHistory(events);

    return aggregate;
  }

  async save(aggregate: T): Promise<void> {
    const events = aggregate.getUncommittedEvents();
    const expectedVersion = aggregate.getVersion() - events.length;

    // Save events
    await this.eventStore.saveEvents(aggregate.getId(), events, expectedVersion);
    aggregate.markEventsAsCommitted();

    // Create snapshot if threshold reached
    if (aggregate.getVersion() % this.snapshotFrequency === 0) {
      await this.snapshotStore.saveSnapshot({
        aggregateId: aggregate.getId(),
        aggregateType: aggregate.getType(),
        version: aggregate.getVersion(),
        state: aggregate.getState(),
        timestamp: new Date()
      });
    }
  }
}

Implementing CQRS Projections

// Projection - builds read models from events
export abstract class Projection {
  abstract handles(): string[];  // Event types this projection handles
  abstract project(event: DomainEvent): Promise<void>;
}

// Order details projection - optimized for order detail view
export class OrderDetailsProjection extends Projection {
  constructor(private db: Database) {
    super();
  }

  handles(): string[] {
    return ['OrderCreated', 'OrderPlaced', 'OrderPaid', 'OrderShipped', 'OrderCancelled'];
  }

  async project(event: DomainEvent): Promise<void> {
    if (event instanceof OrderCreatedEvent) {
      await this.db.none(
        `INSERT INTO order_details_view (
          order_id, customer_id, total_amount, status, 
          items, created_at, updated_at
        ) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
        [
          event.aggregateId,
          event.customerId,
          event.totalAmount,
          'Pending',
          JSON.stringify(event.items),
          event.timestamp,
          event.timestamp
        ]
      );
    } else if (event instanceof OrderShippedEvent) {
      await this.db.none(
        `UPDATE order_details_view 
         SET status = $1, tracking_number = $2, updated_at = $3
         WHERE order_id = $4`,
        ['Shipped', event.trackingNumber, event.timestamp, event.aggregateId]
      );
    }
    // Handle other events...
  }
}

// Customer order history projection - optimized for listing orders
export class CustomerOrderHistoryProjection extends Projection {
  constructor(private db: Database) {
    super();
  }

  handles(): string[] {
    return ['OrderCreated', 'OrderCancelled'];
  }

  async project(event: DomainEvent): Promise<void> {
    if (event instanceof OrderCreatedEvent) {
      await this.db.none(
        `INSERT INTO customer_order_history (
          customer_id, order_id, order_date, total_amount, status
        ) VALUES ($1, $2, $3, $4, $5)`,
        [event.customerId, event.aggregateId, event.timestamp, event.totalAmount, 'Active']
      );
    } else if (event instanceof OrderCancelledEvent) {
      await this.db.none(
        `UPDATE customer_order_history 
         SET status = $1 
         WHERE order_id = $2`,
        ['Cancelled', event.aggregateId]
      );
    }
  }
}

// Projection engine - processes events to update read models
export class ProjectionEngine {
  private projections: Map<string, Projection[]> = new Map();

  constructor(
    private eventStore: EventStore,
    private checkpointStore: CheckpointStore
  ) {}

  registerProjection(projection: Projection): void {
    const eventTypes = projection.handles();
    
    for (const eventType of eventTypes) {
      if (!this.projections.has(eventType)) {
        this.projections.set(eventType, []);
      }
      this.projections.get(eventType)!.push(projection);
    }
  }

  async start(): Promise<void> {
    console.log('Starting projection engine...');
    
    // Load last processed position
    let position = await this.checkpointStore.getCheckpoint('projections');

    while (true) {
      // Fetch batch of events
      const events = await this.eventStore.getAllEvents(position, 100);
      
      if (events.length === 0) {
        // No new events, wait and retry
        await this.sleep(1000);
        continue;
      }

      // Process each event
      for (const event of events) {
        const handlers = this.projections.get(event.eventType) || [];
        
        // Project to all registered projections
        await Promise.all(
          handlers.map(projection => projection.project(event))
        );

        position++;
      }

      // Save checkpoint
      await this.checkpointStore.saveCheckpoint('projections', position);
      console.log(`Processed ${events.length} events, position: ${position}`);
    }
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// Checkpoint store - tracks projection progress
export class CheckpointStore {
  constructor(private db: Database) {}

  async getCheckpoint(name: string): Promise<number> {
    const row = await this.db.oneOrNone(
      'SELECT position FROM checkpoints WHERE name = $1',
      [name]
    );
    return row?.position || 0;
  }

  async saveCheckpoint(name: string, position: number): Promise<void> {
    await this.db.none(
      `INSERT INTO checkpoints (name, position, updated_at)
       VALUES ($1, $2, NOW())
       ON CONFLICT (name) DO UPDATE SET
         position = $2, updated_at = NOW()`,
      [name, position]
    );
  }
}

Handling Eventual Consistency

// Problem: Read model may not be immediately updated after command
async function placeOrder() {
  // Execute command
  await commandBus.execute(new PlaceOrderCommand('ord-123'));
  
  // Try to read immediately
  const order = await queryBus.execute(new GetOrderQuery('ord-123'));
  
  // May still show old status! Projection hasn't processed event yet
  console.log(order.status);  // Might be "Pending" instead of "Placed"
}

// Solution 1: Return version from command, poll until projection catches up
export class CommandResult {
  constructor(
    public readonly aggregateId: string,
    public readonly version: number
  ) {}
}

async function placeOrderWithConsistency() {
  // Command returns version
  const result: CommandResult = await commandBus.execute(
    new PlaceOrderCommand('ord-123')
  );
  
  // Poll until projection has processed this version
  const order = await this.pollUntilVersion(
    'ord-123',
    result.version,
    { maxAttempts: 10, delayMs: 100 }
  );
  
  console.log(order.status);  // Guaranteed to be "Placed"
}

async function pollUntilVersion(
  orderId: string,
  expectedVersion: number,
  options: { maxAttempts: number; delayMs: number }
): Promise<Order> {
  for (let i = 0; i < options.maxAttempts; i++) {
    const order = await queryBus.execute(new GetOrderQuery(orderId));
    
    if (order.version >= expectedVersion) {
      return order;  // Projection caught up
    }
    
    await this.sleep(options.delayMs);
  }
  
  throw new Error('Projection did not catch up in time');
}

// Solution 2: Use SSE/WebSocket to notify when projection updates
export class ProjectionNotifier {
  private subscribers: Map<string, Set<(event: DomainEvent) => void>> = new Map();

  subscribe(aggregateId: string, callback: (event: DomainEvent) => void): () => void {
    if (!this.subscribers.has(aggregateId)) {
      this.subscribers.set(aggregateId, new Set());
    }
    this.subscribers.get(aggregateId)!.add(callback);

    // Return unsubscribe function
    return () => {
      this.subscribers.get(aggregateId)?.delete(callback);
    };
  }

  notify(event: DomainEvent): void {
    const callbacks = this.subscribers.get(event.aggregateId);
    if (callbacks) {
      callbacks.forEach(callback => callback(event));
    }
  }
}

// Client-side: Wait for projection update
async function placeOrderWithNotification() {
  const orderId = 'ord-123';
  
  // Subscribe to updates
  const updates = new EventSource(`/api/orders/${orderId}/updates`);
  
  // Execute command
  await commandBus.execute(new PlaceOrderCommand(orderId));
  
  // Wait for projection update event
  await new Promise((resolve) => {
    updates.addEventListener('OrderProjectionUpdated', (event) => {
      resolve(event);
      updates.close();
    });
  });
  
  // Now query is guaranteed to be up-to-date
  const order = await queryBus.execute(new GetOrderQuery(orderId));
  console.log(order.status);  // "Placed"
}

Event Versioning

// Problem: Events evolve over time
// Version 1: OrderCreated event
interface OrderCreatedEventV1 {
  orderId: string;
  customerId: string;
  items: OrderItem[];
}

// Version 2: Added total amount (breaking change if not handled)
interface OrderCreatedEventV2 {
  orderId: string;
  customerId: string;
  items: OrderItem[];
  totalAmount: number;  // New field
}

// Solution: Upcasting - convert old events to new format
export class EventUpcaster {
  upcast(event: DomainEvent): DomainEvent {
    if (event.eventType === 'OrderCreated') {
      if (!event.data.totalAmount) {
        // V1 event, upcast to V2
        return {
          ...event,
          data: {
            ...event.data,
            totalAmount: event.data.items.reduce(
              (sum, item) => sum + item.price * item.quantity,
              0
            )
          },
          version: 2
        };
      }
    }
    return event;
  }
}

// Upcaster in event store
export class EventStoreWithUpcasting {
  constructor(
    private eventStore: EventStore,
    private upcaster: EventUpcaster
  ) {}

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    const events = await this.eventStore.getEvents(aggregateId);
    
    // Upcast all events to latest version
    return events.map(event => this.upcaster.upcast(event));
  }
}

// Alternative: Keep multiple event handlers
export class Order extends AggregateRoot {
  protected applyEvent(event: DomainEvent): void {
    if (event instanceof OrderCreatedEventV1) {
      this.applyOrderCreatedV1(event);
    } else if (event instanceof OrderCreatedEventV2) {
      this.applyOrderCreatedV2(event);
    }
  }

  private applyOrderCreatedV1(event: OrderCreatedEventV1): void {
    this.customerId = event.customerId;
    this.items = event.items;
    // Calculate total from items
    this.totalAmount = event.items.reduce(
      (sum, item) => sum + item.price * item.quantity,
      0
    );
  }

  private applyOrderCreatedV2(event: OrderCreatedEventV2): void {
    this.customerId = event.customerId;
    this.items = event.items;
    // Use total from event
    this.totalAmount = event.totalAmount;
  }
}

Rebuilding Projections

// Rebuild projection from scratch (useful for bug fixes or new projections)
export class ProjectionRebuilder {
  constructor(
    private eventStore: EventStore,
    private projection: Projection,
    private db: Database
  ) {}

  async rebuild(): Promise<void> {
    console.log('Starting projection rebuild...');

    // Clear existing data
    await this.clearProjection();

    let position = 0;
    let processed = 0;

    while (true) {
      // Fetch batch of events
      const events = await this.eventStore.getAllEvents(position, 1000);
      
      if (events.length === 0) {
        break;  // No more events
      }

      // Process events
      for (const event of events) {
        if (this.projection.handles().includes(event.eventType)) {
          await this.projection.project(event);
          processed++;
        }
        position++;
      }

      console.log(`Rebuilt ${processed} events, position: ${position}`);
    }

    console.log('Projection rebuild complete');
  }

  private async clearProjection(): Promise<void> {
    // Clear projection-specific tables
    await this.db.none('TRUNCATE TABLE order_details_view');
  }
}

Real-World Performance Metrics

// Comparison: Traditional vs CQRS/ES

interface PerformanceMetrics {
  operation: string;
  traditional: number;  // milliseconds
  cqrsES: number;
  improvement: string;
}

const metrics: PerformanceMetrics[] = [
  {
    operation: 'Write operation (insert/update)',
    traditional: 50,
    cqrsES: 80,
    improvement: '-60% (slower due to event storage)'
  },
  {
    operation: 'Simple read (by ID)',
    traditional: 20,
    cqrsES: 5,
    improvement: '4x faster (optimized read model)'
  },
  {
    operation: 'Complex query (joins, aggregations)',
    traditional: 500,
    cqrsES: 50,
    improvement: '10x faster (denormalized read model)'
  },
  {
    operation: 'Audit query (change history)',
    traditional: 2000,
    cqrsES: 100,
    improvement: '20x faster (events stored natively)'
  },
  {
    operation: 'Time travel query',
    traditional: 0,
    cqrsES: 200,
    improvement: 'Impossible vs Possible'
  }
];

// Storage comparison
interface StorageComparison {
  dataset: string;
  traditional: string;
  cqrsES: string;
  overhead: string;
}

const storage: StorageComparison[] = [
  {
    dataset: '1M orders',
    traditional: '5GB',
    cqrsES: '15GB',
    overhead: '3x (events + projections)'
  },
  {
    dataset: 'With 5 projections',
    traditional: '5GB',
    cqrsES: '20GB',
    overhead: '4x (multiple read models)'
  }
];

// Note: Storage is cheaper than compute
// CQRS/ES trades storage for query performance and auditability

Conclusion

CQRS and Event Sourcing are powerful patterns that excel in complex domains requiring audit trails, temporal queries, and independent read/write scaling. Financial services achieve 100% auditability, e-commerce platforms reduce read costs by 70%, and regulated industries gain complete compliance through immutable event logs.

However, these patterns introduce complexity: eventual consistency requires careful UX design, event versioning needs planning, and projection rebuilds must be accounted for. The key to success is applying these patterns selectively—use them for bounded contexts that truly need these capabilities, not as a default architecture for every system.

Start with a single aggregate, implement a basic event store, create one projection, and measure the benefits. If audit trails and temporal queries provide clear value, expand to more aggregates. If the complexity outweighs benefits, traditional CRUD may be the better choice.

Next Steps

  1. Identify a bounded context that needs audit trails or complex queries
  2. Design your aggregates with business rules and domain events
  3. Implement event store using PostgreSQL or EventStoreDB
  4. Create your first projection optimized for a specific query
  5. Handle eventual consistency with versioning or notifications
  6. Plan event versioning strategy before going to production
  7. Monitor projection lag to ensure read models stay up-to-date

Topics Covered

CQRS Event SourcingCQRS PatternEvent SourcingEvent StoreDomain-Driven DesignEventually Consistent

Ready for More?

Explore our comprehensive collection of guides and tutorials to accelerate your tech journey.

Explore All Guides
Weekly Tech Insights

Stay Ahead of the Curve

Join thousands of tech professionals getting weekly insights on AI automation, software architecture, and modern development practices.

No spam, unsubscribe anytimeReal tech insights weekly