Event-Driven Architecture with Apache Kafka: Production Patterns for Scale

Build scalable event-driven systems with Apache Kafka. Complete guide with producer/consumer patterns, schema design, error handling, and production deployment strategies.

15 minutes
Intermediate
2025-10-17

Event-driven architecture transforms how distributed systems communicate, enabling real-time data processing with 10-100x lower latency than traditional request-response patterns. Apache Kafka, processing over 1 trillion messages daily across major tech companies, has become the backbone of modern event-driven systems. With Kafka 3.6 making KRaft mode production-ready (eliminating ZooKeeper) and 65% enterprise adoption in 2025, event streaming is now accessible to organizations of all sizes. Teams implementing event-driven patterns report 70% reduction in system coupling, 5x improvement in scalability, and 80% faster feature development through loose coupling.

This comprehensive guide shows you how to architect, implement, and operate production-grade event-driven systems using Apache Kafka with real code examples and battle-tested patterns.

Understanding Event-Driven Architecture

Event-driven architecture (EDA) is a design pattern where systems communicate through events—immutable facts about something that happened:

Key characteristics:

  • Asynchronous communication: Producers publish events without waiting for consumers
  • Loose coupling: Producers don't know about consumers; add new consumers without changing producers
  • Event log: All events stored in durable log for replay and audit
  • Real-time processing: Events processed as they occur, enabling real-time analytics and reactions
  • Scalability: Producers and consumers scale independently

Comparison with synchronous patterns:

Pattern Latency Coupling Scaling Use Case
Synchronous API 50-500ms High Coupled Request-response, CRUD
Event-Driven 1-50ms Low Independent Real-time, analytics, notifications
Message Queue 10-100ms Medium Independent Task processing, jobs

Kafka Architecture Fundamentals

Core Concepts

// Topic: Category of events (like a database table)
interface Topic {
  name: string;              // e.g., "order-events"
  partitions: number;        // Parallel processing units
  replicationFactor: number; // Fault tolerance (copies across brokers)
  retentionMs: number;       // How long to keep events
}

// Event: Immutable fact with key-value structure
interface KafkaEvent<T> {
  key: string;        // Determines partition (same key → same partition → ordering)
  value: T;           // Event payload
  timestamp: number;  // When event occurred
  headers: Record<string, string>;  // Metadata
}

// Producer: Publishes events to topics
interface Producer {
  send(topic: string, event: KafkaEvent): Promise<RecordMetadata>;
  sendBatch(topic: string, events: KafkaEvent[]): Promise<RecordMetadata[]>;
  flush(): Promise<void>;
}

// Consumer: Reads events from topics
interface Consumer {
  subscribe(topics: string[]): void;
  poll(timeout: number): Promise<ConsumerRecord[]>;
  commit(): Promise<void>;
  seek(partition: number, offset: number): void;
}

// Consumer Group: Multiple consumers coordinate to process events in parallel
interface ConsumerGroup {
  groupId: string;     // Identifies the consumer group
  members: Consumer[]; // Consumers in the group
  // Each partition assigned to only ONE consumer in the group
  // Enables parallel processing + fault tolerance
}

Event Flow Example

// Order service publishes event
const orderEvent: OrderCreatedEvent = {
  type: 'OrderCreated',
  orderId: 'ord-123',
  customerId: 'cust-456',
  items: [{ productId: 'prod-789', quantity: 2, price: 29.99 }],
  totalAmount: 59.98,
  createdAt: new Date().toISOString()
};

// Send to Kafka (partitioned by customerId for ordering)
await producer.send({
  topic: 'order-events',
  key: orderEvent.customerId,  // Same customer → same partition → ordered
  value: JSON.stringify(orderEvent),
  headers: {
    'event-type': 'OrderCreated',
    'version': '1.0',
    'source': 'order-service'
  }
});

// Multiple consumers can process this event independently:
// 1. Inventory service: Reserve items
// 2. Email service: Send order confirmation
// 3. Analytics service: Update dashboards
// 4. Recommendation service: Update user profile
// 5. Fraud detection: Check for suspicious patterns

// All without the order service knowing about any of them!

Setting Up Production Kafka with KRaft

# docker-compose.yml - Modern Kafka setup (no ZooKeeper!)
version: '3.8'

services:
  kafka-1:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka-1
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-1:9092'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'
      KAFKA_LOG_DIRS: '/var/lib/kafka/data'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'  # Explicit topic creation
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'  # Generate with kafka-storage random-uuid
    volumes:
      - kafka-1-data:/var/lib/kafka/data
    networks:
      - kafka-network

  kafka-2:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka-2
    environment:
      KAFKA_NODE_ID: 2
      # ... similar config with different NODE_ID
    volumes:
      - kafka-2-data:/var/lib/kafka/data
    networks:
      - kafka-network

  kafka-3:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka-3
    environment:
      KAFKA_NODE_ID: 3
      # ... similar config
    volumes:
      - kafka-3-data:/var/lib/kafka/data
    networks:
      - kafka-network

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    container_name: schema-registry
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka-1:9092,kafka-2:9092,kafka-3:9092'
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
    ports:
      - '8081:8081'
    networks:
      - kafka-network

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    environment:
      KAFKA_CLUSTERS_0_NAME: production
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
    ports:
      - '8080:8080'
    networks:
      - kafka-network

volumes:
  kafka-1-data:
  kafka-2-data:
  kafka-3-data:

networks:
  kafka-network:
    driver: bridge

Producer Patterns

Pattern 1: Idempotent Producer

// Prevents duplicate events even with retries
import { Kafka, Producer, ProducerRecord } from 'kafkajs';

export class IdempotentKafkaProducer {
  private producer: Producer;

  constructor(brokers: string[]) {
    const kafka = new Kafka({
      clientId: 'order-service',
      brokers: brokers,
      retry: {
        retries: 5,
        initialRetryTime: 100,
        factor: 2
      }
    });

    this.producer = kafka.producer({
      // Enable idempotence - prevents duplicates
      idempotent: true,
      // Max in-flight requests (1-5 for strict ordering)
      maxInFlightRequests: 5,
      // Require acknowledgment from all in-sync replicas
      acks: -1,
      // Timeout for produce request
      timeout: 30000,
      // Compression
      compression: CompressionTypes.GZIP
    });
  }

  async connect(): Promise<void> {
    await this.producer.connect();
  }

  async publishEvent<T>(
    topic: string,
    key: string,
    event: T,
    headers?: Record<string, string>
  ): Promise<void> {
    try {
      const message: ProducerRecord = {
        topic,
        messages: [{
          key,
          value: JSON.stringify(event),
          headers: {
            ...headers,
            timestamp: Date.now().toString(),
            producer: 'order-service'
          }
        }]
      };

      const result = await this.producer.send(message);
      
      console.log('Event published:', {
        topic,
        partition: result[0].partition,
        offset: result[0].offset
      });
    } catch (error) {
      console.error('Failed to publish event:', error);
      // Idempotent producer will retry automatically
      // If all retries fail, handle error appropriately
      throw error;
    }
  }

  async disconnect(): Promise<void> {
    await this.producer.disconnect();
  }
}

// Usage
const producer = new IdempotentKafkaProducer(['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092']);
await producer.connect();

await producer.publishEvent(
  'order-events',
  'cust-123',  // Key ensures same customer's events go to same partition (ordering)
  {
    type: 'OrderCreated',
    orderId: 'ord-456',
    customerId: 'cust-123',
    amount: 99.99
  },
  { 'event-type': 'OrderCreated', 'version': '1.0' }
);

Pattern 2: Transactional Producer

// Atomic writes across multiple topics (all-or-nothing)
export class TransactionalKafkaProducer {
  private producer: Producer;

  constructor(brokers: string[]) {
    const kafka = new Kafka({
      clientId: 'payment-service',
      brokers
    });

    this.producer = kafka.producer({
      idempotent: true,
      transactional: true,  // Enable transactions
      transactionalId: `payment-service-${process.env.HOSTNAME || 'local'}`,
      maxInFlightRequests: 1  // Required for transactions
    });
  }

  async connect(): Promise<void> {
    await this.producer.connect();
  }

  async publishAtomically(operations: Array<{
    topic: string;
    key: string;
    value: any;
  }>): Promise<void> {
    // Begin transaction
    const transaction = await this.producer.transaction();

    try {
      // Send all messages within transaction
      for (const op of operations) {
        await transaction.send({
          topic: op.topic,
          messages: [{
            key: op.key,
            value: JSON.stringify(op.value)
          }]
        });
      }

      // Commit transaction (all messages visible atomically)
      await transaction.commit();
      console.log('Transaction committed:', operations.length, 'messages');
    } catch (error) {
      // Abort transaction (no messages visible)
      await transaction.abort();
      console.error('Transaction aborted:', error);
      throw error;
    }
  }
}

// Usage: Process payment and publish multiple events atomically
const producer = new TransactionalKafkaProducer(['kafka-1:9092']);
await producer.connect();

// Either ALL of these are published, or NONE
await producer.publishAtomically([
  {
    topic: 'payment-events',
    key: 'payment-789',
    value: { type: 'PaymentProcessed', paymentId: 'payment-789', amount: 99.99 }
  },
  {
    topic: 'order-events',
    key: 'ord-456',
    value: { type: 'OrderPaid', orderId: 'ord-456', paymentId: 'payment-789' }
  },
  {
    topic: 'accounting-events',
    key: 'cust-123',
    value: { type: 'RevenueRecorded', customerId: 'cust-123', amount: 99.99 }
  }
]);

Pattern 3: Batch Producer with Compression

// High-throughput batch publishing
export class BatchKafkaProducer {
  private producer: Producer;
  private batch: Array<{ topic: string; key: string; value: any }> = [];
  private batchSize = 100;
  private flushInterval: NodeJS.Timeout;

  constructor(brokers: string[], batchSize = 100, flushIntervalMs = 1000) {
    const kafka = new Kafka({
      clientId: 'analytics-service',
      brokers
    });

    this.producer = kafka.producer({
      idempotent: true,
      // Batch settings
      compression: CompressionTypes.SNAPPY,  // Fast compression
      batch: {
        size: 16384,  // 16KB batch size
        lingerMs: 10   // Wait up to 10ms for more messages
      }
    });

    this.batchSize = batchSize;

    // Auto-flush on interval
    this.flushInterval = setInterval(() => {
      this.flush();
    }, flushIntervalMs);
  }

  async connect(): Promise<void> {
    await this.producer.connect();
  }

  addToBatch(topic: string, key: string, value: any): void {
    this.batch.push({ topic, key, value });

    if (this.batch.length >= this.batchSize) {
      this.flush();
    }
  }

  async flush(): Promise<void> {
    if (this.batch.length === 0) return;

    const messagesToSend = [...this.batch];
    this.batch = [];

    try {
      // Group by topic for efficient sending
      const byTopic = new Map<string, Array<{ key: string; value: any }>>();
      
      for (const msg of messagesToSend) {
        if (!byTopic.has(msg.topic)) {
          byTopic.set(msg.topic, []);
        }
        byTopic.get(msg.topic)!.push({
          key: msg.key,
          value: msg.value
        });
      }

      // Send all topics
      await Promise.all(
        Array.from(byTopic.entries()).map(([topic, messages]) =>
          this.producer.send({
            topic,
            messages: messages.map(m => ({
              key: m.key,
              value: JSON.stringify(m.value)
            }))
          })
        )
      );

      console.log(`Flushed ${messagesToSend.length} messages`);
    } catch (error) {
      console.error('Flush failed:', error);
      // Re-add to batch for retry
      this.batch.unshift(...messagesToSend);
    }
  }

  async disconnect(): Promise<void> {
    clearInterval(this.flushInterval);
    await this.flush();  // Final flush
    await this.producer.disconnect();
  }
}

// Usage: High-volume analytics events
const producer = new BatchKafkaProducer(['kafka-1:9092'], 500, 2000);
await producer.connect();

// Add many events efficiently
for (let i = 0; i < 10000; i++) {
  producer.addToBatch(
    'page-view-events',
    `user-${i % 100}`,
    {
      type: 'PageView',
      userId: `user-${i % 100}`,
      page: '/products',
      timestamp: Date.now()
    }
  );
}

// Batches automatically sent every 2 seconds or when 500 events accumulated

Consumer Patterns

Pattern 1: At-Least-Once Consumer

// Guaranteed processing with manual offset management
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';

export class AtLeastOnceConsumer {
  private consumer: Consumer;

  constructor(brokers: string[], groupId: string) {
    const kafka = new Kafka({
      clientId: 'inventory-service',
      brokers
    });

    this.consumer = kafka.consumer({
      groupId,
      // Start from earliest unprocessed message
      sessionTimeout: 30000,
      heartbeatInterval: 3000,
      // Disable auto-commit for manual control
      autoCommit: false
    });
  }

  async connect(): Promise<void> {
    await this.consumer.connect();
  }

  async subscribe(topics: string[]): Promise<void> {
    await this.consumer.subscribe({
      topics,
      fromBeginning: false  // Start from last committed offset
    });
  }

  async startConsuming(handler: (message: any) => Promise<void>): Promise<void> {
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
        const event = JSON.parse(message.value?.toString() || '{}');
        
        try {
          console.log('Processing event:', {
            topic,
            partition,
            offset: message.offset,
            key: message.key?.toString(),
            event: event.type
          });

          // Process the message
          await handler(event);

          // ONLY commit after successful processing
          await this.consumer.commitOffsets([{
            topic,
            partition,
            offset: (parseInt(message.offset) + 1).toString()
          }]);

          console.log('Event processed and committed');
        } catch (error) {
          console.error('Processing failed:', error);
          // Don't commit - message will be reprocessed
          // Consider dead letter queue for poison pills
          
          // Optionally: Send to DLQ after N retries
          const retryCount = parseInt(message.headers?.retryCount?.toString() || '0');
          if (retryCount >= 3) {
            await this.sendToDeadLetterQueue(topic, message, error);
            // Commit to skip the poison pill
            await this.consumer.commitOffsets([{
              topic,
              partition,
              offset: (parseInt(message.offset) + 1).toString()
            }]);
          }
          // Otherwise: Don't commit, will retry on next poll
        }
      }
    });
  }

  private async sendToDeadLetterQueue(
    originalTopic: string,
    message: any,
    error: any
  ): Promise<void> {
    // Send to DLQ topic for manual investigation
    const dlqProducer = new IdempotentKafkaProducer(['kafka-1:9092']);
    await dlqProducer.connect();
    
    await dlqProducer.publishEvent(
      `${originalTopic}.dlq`,
      message.key?.toString() || 'unknown',
      {
        originalMessage: JSON.parse(message.value.toString()),
        error: error.message,
        failedAt: new Date().toISOString()
      }
    );
    
    await dlqProducer.disconnect();
  }

  async disconnect(): Promise<void> {
    await this.consumer.disconnect();
  }
}

// Usage
const consumer = new AtLeastOnceConsumer(['kafka-1:9092'], 'inventory-service-group');
await consumer.connect();
await consumer.subscribe(['order-events']);

await consumer.startConsuming(async (event) => {
  if (event.type === 'OrderCreated') {
    // Reserve inventory
    await inventoryService.reserve(event.orderId, event.items);
  }
});

Pattern 2: Parallel Consumer with Batch Processing

// Process events in parallel within partition
export class ParallelBatchConsumer {
  private consumer: Consumer;

  constructor(brokers: string[], groupId: string, private batchSize = 10) {
    const kafka = new Kafka({ clientId: 'email-service', brokers });
    
    this.consumer = kafka.consumer({
      groupId,
      sessionTimeout: 60000,  // Longer timeout for batch processing
      maxWaitTimeInMs: 1000,  // Wait up to 1s for batch
      maxBytesPerPartition: 1048576  // 1MB per partition per fetch
    });
  }

  async startConsuming(handler: (events: any[]) => Promise<void>): Promise<void> {
    await this.consumer.run({
      eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
        const messages = batch.messages;
        console.log(`Processing batch: ${messages.length} messages`);

        // Process in chunks
        for (let i = 0; i < messages.length; i += this.batchSize) {
          const chunk = messages.slice(i, i + this.batchSize);
          
          // Parse events
          const events = chunk.map(m => 
            JSON.parse(m.value?.toString() || '{}')
          );

          try {
            // Process batch in parallel
            await handler(events);

            // Commit last offset in chunk
            const lastMessage = chunk[chunk.length - 1];
            resolveOffset(lastMessage.offset);
            await heartbeat();  // Keep session alive
          } catch (error) {
            console.error('Batch processing failed:', error);
            // Will retry the batch
            break;
          }
        }
      }
    });
  }
}

// Usage: Send batch of emails efficiently
const consumer = new ParallelBatchConsumer(['kafka-1:9092'], 'email-service-group', 50);
await consumer.startConsuming(async (events) => {
  // Send 50 emails in parallel
  await Promise.all(events.map(event => 
    emailService.send({
      to: event.email,
      subject: event.subject,
      body: event.body
    })
  ));
  console.log(`Sent ${events.length} emails`);
});

Schema Management with Avro

// Schema Registry for event schema evolution
import { SchemaRegistry, SchemaType } from '@kafkajs/confluent-schema-registry';
import avro from 'avro-js';

const registry = new SchemaRegistry({
  host: 'http://schema-registry:8081'
});

// Define Avro schema
const orderCreatedSchema = {
  type: 'record',
  name: 'OrderCreated',
  namespace: 'com.company.orders',
  fields: [
    { name: 'orderId', type: 'string' },
    { name: 'customerId', type: 'string' },
    { name: 'totalAmount', type: 'double' },
    { name: 'items', type: {
      type: 'array',
      items: {
        type: 'record',
        name: 'OrderItem',
        fields: [
          { name: 'productId', type: 'string' },
          { name: 'quantity', type: 'int' },
          { name: 'price', type: 'double' }
        ]
      }
    }},
    { name: 'createdAt', type: 'string' }
  ]
};

// Register schema
const { id: schemaId } = await registry.register({
  type: SchemaType.AVRO,
  schema: JSON.stringify(orderCreatedSchema)
});

// Producer with schema validation
async function publishWithSchema(event: any): Promise<void> {
  // Encode using schema
  const encodedMessage = await registry.encode(schemaId, event);
  
  await producer.send({
    topic: 'order-events',
    messages: [{
      key: event.customerId,
      value: encodedMessage  // Binary Avro format
    }]
  });
}

// Consumer with automatic deserialization
async function consumeWithSchema(message: any): Promise<any> {
  // Decode using schema registry
  const decodedEvent = await registry.decode(message.value);
  return decodedEvent;
}

// Schema evolution - add optional field (backward compatible)
const orderCreatedSchemaV2 = {
  ...orderCreatedSchema,
  fields: [
    ...orderCreatedSchema.fields,
    { name: 'couponCode', type: ['null', 'string'], default: null }  // Optional
  ]
};

// Old consumers can still read new events (ignore unknown fields)
// New consumers can read old events (use default for missing fields)

Error Handling and Resilience

// Comprehensive error handling strategy
export class ResilientKafkaConsumer {
  private consumer: Consumer;
  private deadLetterProducer: Producer;
  private retryTopics: Map<number, string> = new Map([
    [1, 'order-events.retry-1'],
    [2, 'order-events.retry-2'],
    [3, 'order-events.retry-3']
  ]);

  async processWithRetry(message: any): Promise<void> {
    const retryCount = parseInt(message.headers?.retryCount || '0');
    const maxRetries = 3;

    try {
      await this.processMessage(message);
    } catch (error) {
      if (retryCount < maxRetries) {
        // Send to retry topic with backoff
        await this.sendToRetryTopic(message, retryCount + 1);
      } else {
        // Max retries exceeded, send to DLQ
        await this.sendToDeadLetterQueue(message, error);
      }
    }
  }

  private async sendToRetryTopic(message: any, retryCount: number): Promise<void> {
    const retryTopic = this.retryTopics.get(retryCount);
    const delayMs = Math.pow(2, retryCount) * 1000;  // Exponential backoff

    await this.deadLetterProducer.send({
      topic: retryTopic!,
      messages: [{
        key: message.key,
        value: message.value,
        headers: {
          ...message.headers,
          retryCount: retryCount.toString(),
          retryAt: (Date.now() + delayMs).toString()
        }
      }]
    });

    console.log(`Sent to retry topic: ${retryTopic}, retry #${retryCount}`);
  }

  private async processMessage(message: any): Promise<void> {
    // Your business logic here
    throw new Error('Simulated failure');
  }
}

Monitoring and Observability

// Kafka metrics collection
import { Counter, Histogram, Gauge } from 'prom-client';

export class KafkaMetrics {
  private messagesProduced = new Counter({
    name: 'kafka_messages_produced_total',
    help: 'Total messages produced',
    labelNames: ['topic']
  });

  private messagesConsumed = new Counter({
    name: 'kafka_messages_consumed_total',
    help: 'Total messages consumed',
    labelNames: ['topic', 'consumer_group']
  });

  private produceDuration = new Histogram({
    name: 'kafka_produce_duration_seconds',
    help: 'Message production duration',
    labelNames: ['topic']
  });

  private consumerLag = new Gauge({
    name: 'kafka_consumer_lag',
    help: 'Consumer lag per partition',
    labelNames: ['topic', 'partition', 'consumer_group']
  });

  recordMessageProduced(topic: string): void {
    this.messagesProduced.inc({ topic });
  }

  recordMessageConsumed(topic: string, groupId: string): void {
    this.messagesConsumed.inc({ topic, consumer_group: groupId });
  }

  async updateConsumerLag(
    admin: Admin,
    groupId: string,
    topic: string
  ): Promise<void> {
    const offsets = await admin.fetchOffsets({ groupId, topics: [topic] });
    const topicOffsets = await admin.fetchTopicOffsets(topic);

    for (const partition of offsets[0].partitions) {
      const high = topicOffsets.find(o => o.partition === partition.partition)?.high || '0';
      const current = partition.offset;
      const lag = parseInt(high) - parseInt(current);

      this.consumerLag.set(
        { topic, partition: partition.partition.toString(), consumer_group: groupId },
        lag
      );
    }
  }
}

Deployment Best Practices

# Production Kafka configuration
# config/server.properties

# Broker settings
broker.id=1
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# Log settings
log.dirs=/var/kafka-logs
num.partitions=6
default.replication.factor=3
min.insync.replicas=2

# Retention
log.retention.hours=168  # 7 days
log.retention.bytes=-1   # No limit
log.segment.bytes=1073741824  # 1GB segments

# Performance
compression.type=snappy
log.cleanup.policy=delete

# Durability
acks=all
unclean.leader.election.enable=false

# KRaft mode
process.roles=broker,controller
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093

Conclusion

Event-driven architecture with Apache Kafka enables building highly scalable, loosely coupled systems that process data in real-time. The patterns covered—idempotent producers, transactional writes, at-least-once consumers, schema evolution, and comprehensive error handling—provide the foundation for production-grade event-driven systems.

Kafka's durability, scalability to millions of events per second, and ability to replay historical data make it ideal for event sourcing, real-time analytics, change data capture, and microservices communication. With KRaft mode eliminating ZooKeeper complexity and the rich ecosystem of connectors and stream processing tools, Kafka is more accessible than ever.

Start with a simple producer-consumer pattern, add schema registry for type safety, implement proper error handling with DLQ, and monitor consumer lag closely. As your system grows, leverage Kafka Streams or ksqlDB for stream processing, and consider multi-datacenter replication for disaster recovery.

Next Steps

  1. Deploy Kafka cluster using KRaft mode with 3+ brokers for production
  2. Implement idempotent producer for your first event type
  3. Set up Schema Registry to enforce event contracts
  4. Create at-least-once consumer with dead letter queue
  5. Add monitoring for producer/consumer metrics and lag
  6. Design event schemas following domain-driven design principles
  7. Implement retry strategy with exponential backoff and DLQ

Topics Covered

Event-Driven ArchitectureApache KafkaEvent StreamingKafka PatternsDistributed SystemsAsync Messaging

Ready for More?

Explore our comprehensive collection of guides and tutorials to accelerate your tech journey.

Explore All Guides
Weekly Tech Insights

Stay Ahead of the Curve

Join thousands of tech professionals getting weekly insights on AI automation, software architecture, and modern development practices.

No spam, unsubscribe anytimeReal tech insights weekly