Event-driven architecture transforms how distributed systems communicate, enabling real-time data processing with 10-100x lower latency than traditional request-response patterns. Apache Kafka, processing over 1 trillion messages daily across major tech companies, has become the backbone of modern event-driven systems. With Kafka 3.6 making KRaft mode production-ready (eliminating ZooKeeper) and 65% enterprise adoption in 2025, event streaming is now accessible to organizations of all sizes. Teams implementing event-driven patterns report 70% reduction in system coupling, 5x improvement in scalability, and 80% faster feature development through loose coupling.
This comprehensive guide shows you how to architect, implement, and operate production-grade event-driven systems using Apache Kafka with real code examples and battle-tested patterns.
Understanding Event-Driven Architecture
Event-driven architecture (EDA) is a design pattern where systems communicate through events—immutable facts about something that happened:
Key characteristics:
- Asynchronous communication: Producers publish events without waiting for consumers
- Loose coupling: Producers don't know about consumers; add new consumers without changing producers
- Event log: All events stored in durable log for replay and audit
- Real-time processing: Events processed as they occur, enabling real-time analytics and reactions
- Scalability: Producers and consumers scale independently
Comparison with synchronous patterns:
| Pattern | Latency | Coupling | Scaling | Use Case |
|---|---|---|---|---|
| Synchronous API | 50-500ms | High | Coupled | Request-response, CRUD |
| Event-Driven | 1-50ms | Low | Independent | Real-time, analytics, notifications |
| Message Queue | 10-100ms | Medium | Independent | Task processing, jobs |
Kafka Architecture Fundamentals
Core Concepts
// Topic: Category of events (like a database table)
interface Topic {
name: string; // e.g., "order-events"
partitions: number; // Parallel processing units
replicationFactor: number; // Fault tolerance (copies across brokers)
retentionMs: number; // How long to keep events
}
// Event: Immutable fact with key-value structure
interface KafkaEvent<T> {
key: string; // Determines partition (same key → same partition → ordering)
value: T; // Event payload
timestamp: number; // When event occurred
headers: Record<string, string>; // Metadata
}
// Producer: Publishes events to topics
interface Producer {
send(topic: string, event: KafkaEvent): Promise<RecordMetadata>;
sendBatch(topic: string, events: KafkaEvent[]): Promise<RecordMetadata[]>;
flush(): Promise<void>;
}
// Consumer: Reads events from topics
interface Consumer {
subscribe(topics: string[]): void;
poll(timeout: number): Promise<ConsumerRecord[]>;
commit(): Promise<void>;
seek(partition: number, offset: number): void;
}
// Consumer Group: Multiple consumers coordinate to process events in parallel
interface ConsumerGroup {
groupId: string; // Identifies the consumer group
members: Consumer[]; // Consumers in the group
// Each partition assigned to only ONE consumer in the group
// Enables parallel processing + fault tolerance
}
Event Flow Example
// Order service publishes event
const orderEvent: OrderCreatedEvent = {
type: 'OrderCreated',
orderId: 'ord-123',
customerId: 'cust-456',
items: [{ productId: 'prod-789', quantity: 2, price: 29.99 }],
totalAmount: 59.98,
createdAt: new Date().toISOString()
};
// Send to Kafka (partitioned by customerId for ordering)
await producer.send({
topic: 'order-events',
key: orderEvent.customerId, // Same customer → same partition → ordered
value: JSON.stringify(orderEvent),
headers: {
'event-type': 'OrderCreated',
'version': '1.0',
'source': 'order-service'
}
});
// Multiple consumers can process this event independently:
// 1. Inventory service: Reserve items
// 2. Email service: Send order confirmation
// 3. Analytics service: Update dashboards
// 4. Recommendation service: Update user profile
// 5. Fraud detection: Check for suspicious patterns
// All without the order service knowing about any of them!
Setting Up Production Kafka with KRaft
# docker-compose.yml - Modern Kafka setup (no ZooKeeper!)
version: '3.8'
services:
kafka-1:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka-1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-1:9092'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'
KAFKA_LOG_DIRS: '/var/lib/kafka/data'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' # Explicit topic creation
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' # Generate with kafka-storage random-uuid
volumes:
- kafka-1-data:/var/lib/kafka/data
networks:
- kafka-network
kafka-2:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka-2
environment:
KAFKA_NODE_ID: 2
# ... similar config with different NODE_ID
volumes:
- kafka-2-data:/var/lib/kafka/data
networks:
- kafka-network
kafka-3:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka-3
environment:
KAFKA_NODE_ID: 3
# ... similar config
volumes:
- kafka-3-data:/var/lib/kafka/data
networks:
- kafka-network
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
container_name: schema-registry
depends_on:
- kafka-1
- kafka-2
- kafka-3
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka-1:9092,kafka-2:9092,kafka-3:9092'
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
ports:
- '8081:8081'
networks:
- kafka-network
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka-1
- kafka-2
- kafka-3
environment:
KAFKA_CLUSTERS_0_NAME: production
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
ports:
- '8080:8080'
networks:
- kafka-network
volumes:
kafka-1-data:
kafka-2-data:
kafka-3-data:
networks:
kafka-network:
driver: bridge
Producer Patterns
Pattern 1: Idempotent Producer
// Prevents duplicate events even with retries
import { Kafka, Producer, ProducerRecord } from 'kafkajs';
export class IdempotentKafkaProducer {
private producer: Producer;
constructor(brokers: string[]) {
const kafka = new Kafka({
clientId: 'order-service',
brokers: brokers,
retry: {
retries: 5,
initialRetryTime: 100,
factor: 2
}
});
this.producer = kafka.producer({
// Enable idempotence - prevents duplicates
idempotent: true,
// Max in-flight requests (1-5 for strict ordering)
maxInFlightRequests: 5,
// Require acknowledgment from all in-sync replicas
acks: -1,
// Timeout for produce request
timeout: 30000,
// Compression
compression: CompressionTypes.GZIP
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async publishEvent<T>(
topic: string,
key: string,
event: T,
headers?: Record<string, string>
): Promise<void> {
try {
const message: ProducerRecord = {
topic,
messages: [{
key,
value: JSON.stringify(event),
headers: {
...headers,
timestamp: Date.now().toString(),
producer: 'order-service'
}
}]
};
const result = await this.producer.send(message);
console.log('Event published:', {
topic,
partition: result[0].partition,
offset: result[0].offset
});
} catch (error) {
console.error('Failed to publish event:', error);
// Idempotent producer will retry automatically
// If all retries fail, handle error appropriately
throw error;
}
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
}
}
// Usage
const producer = new IdempotentKafkaProducer(['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092']);
await producer.connect();
await producer.publishEvent(
'order-events',
'cust-123', // Key ensures same customer's events go to same partition (ordering)
{
type: 'OrderCreated',
orderId: 'ord-456',
customerId: 'cust-123',
amount: 99.99
},
{ 'event-type': 'OrderCreated', 'version': '1.0' }
);
Pattern 2: Transactional Producer
// Atomic writes across multiple topics (all-or-nothing)
export class TransactionalKafkaProducer {
private producer: Producer;
constructor(brokers: string[]) {
const kafka = new Kafka({
clientId: 'payment-service',
brokers
});
this.producer = kafka.producer({
idempotent: true,
transactional: true, // Enable transactions
transactionalId: `payment-service-${process.env.HOSTNAME || 'local'}`,
maxInFlightRequests: 1 // Required for transactions
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async publishAtomically(operations: Array<{
topic: string;
key: string;
value: any;
}>): Promise<void> {
// Begin transaction
const transaction = await this.producer.transaction();
try {
// Send all messages within transaction
for (const op of operations) {
await transaction.send({
topic: op.topic,
messages: [{
key: op.key,
value: JSON.stringify(op.value)
}]
});
}
// Commit transaction (all messages visible atomically)
await transaction.commit();
console.log('Transaction committed:', operations.length, 'messages');
} catch (error) {
// Abort transaction (no messages visible)
await transaction.abort();
console.error('Transaction aborted:', error);
throw error;
}
}
}
// Usage: Process payment and publish multiple events atomically
const producer = new TransactionalKafkaProducer(['kafka-1:9092']);
await producer.connect();
// Either ALL of these are published, or NONE
await producer.publishAtomically([
{
topic: 'payment-events',
key: 'payment-789',
value: { type: 'PaymentProcessed', paymentId: 'payment-789', amount: 99.99 }
},
{
topic: 'order-events',
key: 'ord-456',
value: { type: 'OrderPaid', orderId: 'ord-456', paymentId: 'payment-789' }
},
{
topic: 'accounting-events',
key: 'cust-123',
value: { type: 'RevenueRecorded', customerId: 'cust-123', amount: 99.99 }
}
]);
Pattern 3: Batch Producer with Compression
// High-throughput batch publishing
export class BatchKafkaProducer {
private producer: Producer;
private batch: Array<{ topic: string; key: string; value: any }> = [];
private batchSize = 100;
private flushInterval: NodeJS.Timeout;
constructor(brokers: string[], batchSize = 100, flushIntervalMs = 1000) {
const kafka = new Kafka({
clientId: 'analytics-service',
brokers
});
this.producer = kafka.producer({
idempotent: true,
// Batch settings
compression: CompressionTypes.SNAPPY, // Fast compression
batch: {
size: 16384, // 16KB batch size
lingerMs: 10 // Wait up to 10ms for more messages
}
});
this.batchSize = batchSize;
// Auto-flush on interval
this.flushInterval = setInterval(() => {
this.flush();
}, flushIntervalMs);
}
async connect(): Promise<void> {
await this.producer.connect();
}
addToBatch(topic: string, key: string, value: any): void {
this.batch.push({ topic, key, value });
if (this.batch.length >= this.batchSize) {
this.flush();
}
}
async flush(): Promise<void> {
if (this.batch.length === 0) return;
const messagesToSend = [...this.batch];
this.batch = [];
try {
// Group by topic for efficient sending
const byTopic = new Map<string, Array<{ key: string; value: any }>>();
for (const msg of messagesToSend) {
if (!byTopic.has(msg.topic)) {
byTopic.set(msg.topic, []);
}
byTopic.get(msg.topic)!.push({
key: msg.key,
value: msg.value
});
}
// Send all topics
await Promise.all(
Array.from(byTopic.entries()).map(([topic, messages]) =>
this.producer.send({
topic,
messages: messages.map(m => ({
key: m.key,
value: JSON.stringify(m.value)
}))
})
)
);
console.log(`Flushed ${messagesToSend.length} messages`);
} catch (error) {
console.error('Flush failed:', error);
// Re-add to batch for retry
this.batch.unshift(...messagesToSend);
}
}
async disconnect(): Promise<void> {
clearInterval(this.flushInterval);
await this.flush(); // Final flush
await this.producer.disconnect();
}
}
// Usage: High-volume analytics events
const producer = new BatchKafkaProducer(['kafka-1:9092'], 500, 2000);
await producer.connect();
// Add many events efficiently
for (let i = 0; i < 10000; i++) {
producer.addToBatch(
'page-view-events',
`user-${i % 100}`,
{
type: 'PageView',
userId: `user-${i % 100}`,
page: '/products',
timestamp: Date.now()
}
);
}
// Batches automatically sent every 2 seconds or when 500 events accumulated
Consumer Patterns
Pattern 1: At-Least-Once Consumer
// Guaranteed processing with manual offset management
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
export class AtLeastOnceConsumer {
private consumer: Consumer;
constructor(brokers: string[], groupId: string) {
const kafka = new Kafka({
clientId: 'inventory-service',
brokers
});
this.consumer = kafka.consumer({
groupId,
// Start from earliest unprocessed message
sessionTimeout: 30000,
heartbeatInterval: 3000,
// Disable auto-commit for manual control
autoCommit: false
});
}
async connect(): Promise<void> {
await this.consumer.connect();
}
async subscribe(topics: string[]): Promise<void> {
await this.consumer.subscribe({
topics,
fromBeginning: false // Start from last committed offset
});
}
async startConsuming(handler: (message: any) => Promise<void>): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const event = JSON.parse(message.value?.toString() || '{}');
try {
console.log('Processing event:', {
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
event: event.type
});
// Process the message
await handler(event);
// ONLY commit after successful processing
await this.consumer.commitOffsets([{
topic,
partition,
offset: (parseInt(message.offset) + 1).toString()
}]);
console.log('Event processed and committed');
} catch (error) {
console.error('Processing failed:', error);
// Don't commit - message will be reprocessed
// Consider dead letter queue for poison pills
// Optionally: Send to DLQ after N retries
const retryCount = parseInt(message.headers?.retryCount?.toString() || '0');
if (retryCount >= 3) {
await this.sendToDeadLetterQueue(topic, message, error);
// Commit to skip the poison pill
await this.consumer.commitOffsets([{
topic,
partition,
offset: (parseInt(message.offset) + 1).toString()
}]);
}
// Otherwise: Don't commit, will retry on next poll
}
}
});
}
private async sendToDeadLetterQueue(
originalTopic: string,
message: any,
error: any
): Promise<void> {
// Send to DLQ topic for manual investigation
const dlqProducer = new IdempotentKafkaProducer(['kafka-1:9092']);
await dlqProducer.connect();
await dlqProducer.publishEvent(
`${originalTopic}.dlq`,
message.key?.toString() || 'unknown',
{
originalMessage: JSON.parse(message.value.toString()),
error: error.message,
failedAt: new Date().toISOString()
}
);
await dlqProducer.disconnect();
}
async disconnect(): Promise<void> {
await this.consumer.disconnect();
}
}
// Usage
const consumer = new AtLeastOnceConsumer(['kafka-1:9092'], 'inventory-service-group');
await consumer.connect();
await consumer.subscribe(['order-events']);
await consumer.startConsuming(async (event) => {
if (event.type === 'OrderCreated') {
// Reserve inventory
await inventoryService.reserve(event.orderId, event.items);
}
});
Pattern 2: Parallel Consumer with Batch Processing
// Process events in parallel within partition
export class ParallelBatchConsumer {
private consumer: Consumer;
constructor(brokers: string[], groupId: string, private batchSize = 10) {
const kafka = new Kafka({ clientId: 'email-service', brokers });
this.consumer = kafka.consumer({
groupId,
sessionTimeout: 60000, // Longer timeout for batch processing
maxWaitTimeInMs: 1000, // Wait up to 1s for batch
maxBytesPerPartition: 1048576 // 1MB per partition per fetch
});
}
async startConsuming(handler: (events: any[]) => Promise<void>): Promise<void> {
await this.consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
const messages = batch.messages;
console.log(`Processing batch: ${messages.length} messages`);
// Process in chunks
for (let i = 0; i < messages.length; i += this.batchSize) {
const chunk = messages.slice(i, i + this.batchSize);
// Parse events
const events = chunk.map(m =>
JSON.parse(m.value?.toString() || '{}')
);
try {
// Process batch in parallel
await handler(events);
// Commit last offset in chunk
const lastMessage = chunk[chunk.length - 1];
resolveOffset(lastMessage.offset);
await heartbeat(); // Keep session alive
} catch (error) {
console.error('Batch processing failed:', error);
// Will retry the batch
break;
}
}
}
});
}
}
// Usage: Send batch of emails efficiently
const consumer = new ParallelBatchConsumer(['kafka-1:9092'], 'email-service-group', 50);
await consumer.startConsuming(async (events) => {
// Send 50 emails in parallel
await Promise.all(events.map(event =>
emailService.send({
to: event.email,
subject: event.subject,
body: event.body
})
));
console.log(`Sent ${events.length} emails`);
});
Schema Management with Avro
// Schema Registry for event schema evolution
import { SchemaRegistry, SchemaType } from '@kafkajs/confluent-schema-registry';
import avro from 'avro-js';
const registry = new SchemaRegistry({
host: 'http://schema-registry:8081'
});
// Define Avro schema
const orderCreatedSchema = {
type: 'record',
name: 'OrderCreated',
namespace: 'com.company.orders',
fields: [
{ name: 'orderId', type: 'string' },
{ name: 'customerId', type: 'string' },
{ name: 'totalAmount', type: 'double' },
{ name: 'items', type: {
type: 'array',
items: {
type: 'record',
name: 'OrderItem',
fields: [
{ name: 'productId', type: 'string' },
{ name: 'quantity', type: 'int' },
{ name: 'price', type: 'double' }
]
}
}},
{ name: 'createdAt', type: 'string' }
]
};
// Register schema
const { id: schemaId } = await registry.register({
type: SchemaType.AVRO,
schema: JSON.stringify(orderCreatedSchema)
});
// Producer with schema validation
async function publishWithSchema(event: any): Promise<void> {
// Encode using schema
const encodedMessage = await registry.encode(schemaId, event);
await producer.send({
topic: 'order-events',
messages: [{
key: event.customerId,
value: encodedMessage // Binary Avro format
}]
});
}
// Consumer with automatic deserialization
async function consumeWithSchema(message: any): Promise<any> {
// Decode using schema registry
const decodedEvent = await registry.decode(message.value);
return decodedEvent;
}
// Schema evolution - add optional field (backward compatible)
const orderCreatedSchemaV2 = {
...orderCreatedSchema,
fields: [
...orderCreatedSchema.fields,
{ name: 'couponCode', type: ['null', 'string'], default: null } // Optional
]
};
// Old consumers can still read new events (ignore unknown fields)
// New consumers can read old events (use default for missing fields)
Error Handling and Resilience
// Comprehensive error handling strategy
export class ResilientKafkaConsumer {
private consumer: Consumer;
private deadLetterProducer: Producer;
private retryTopics: Map<number, string> = new Map([
[1, 'order-events.retry-1'],
[2, 'order-events.retry-2'],
[3, 'order-events.retry-3']
]);
async processWithRetry(message: any): Promise<void> {
const retryCount = parseInt(message.headers?.retryCount || '0');
const maxRetries = 3;
try {
await this.processMessage(message);
} catch (error) {
if (retryCount < maxRetries) {
// Send to retry topic with backoff
await this.sendToRetryTopic(message, retryCount + 1);
} else {
// Max retries exceeded, send to DLQ
await this.sendToDeadLetterQueue(message, error);
}
}
}
private async sendToRetryTopic(message: any, retryCount: number): Promise<void> {
const retryTopic = this.retryTopics.get(retryCount);
const delayMs = Math.pow(2, retryCount) * 1000; // Exponential backoff
await this.deadLetterProducer.send({
topic: retryTopic!,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
retryCount: retryCount.toString(),
retryAt: (Date.now() + delayMs).toString()
}
}]
});
console.log(`Sent to retry topic: ${retryTopic}, retry #${retryCount}`);
}
private async processMessage(message: any): Promise<void> {
// Your business logic here
throw new Error('Simulated failure');
}
}
Monitoring and Observability
// Kafka metrics collection
import { Counter, Histogram, Gauge } from 'prom-client';
export class KafkaMetrics {
private messagesProduced = new Counter({
name: 'kafka_messages_produced_total',
help: 'Total messages produced',
labelNames: ['topic']
});
private messagesConsumed = new Counter({
name: 'kafka_messages_consumed_total',
help: 'Total messages consumed',
labelNames: ['topic', 'consumer_group']
});
private produceDuration = new Histogram({
name: 'kafka_produce_duration_seconds',
help: 'Message production duration',
labelNames: ['topic']
});
private consumerLag = new Gauge({
name: 'kafka_consumer_lag',
help: 'Consumer lag per partition',
labelNames: ['topic', 'partition', 'consumer_group']
});
recordMessageProduced(topic: string): void {
this.messagesProduced.inc({ topic });
}
recordMessageConsumed(topic: string, groupId: string): void {
this.messagesConsumed.inc({ topic, consumer_group: groupId });
}
async updateConsumerLag(
admin: Admin,
groupId: string,
topic: string
): Promise<void> {
const offsets = await admin.fetchOffsets({ groupId, topics: [topic] });
const topicOffsets = await admin.fetchTopicOffsets(topic);
for (const partition of offsets[0].partitions) {
const high = topicOffsets.find(o => o.partition === partition.partition)?.high || '0';
const current = partition.offset;
const lag = parseInt(high) - parseInt(current);
this.consumerLag.set(
{ topic, partition: partition.partition.toString(), consumer_group: groupId },
lag
);
}
}
}
Deployment Best Practices
# Production Kafka configuration
# config/server.properties
# Broker settings
broker.id=1
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Log settings
log.dirs=/var/kafka-logs
num.partitions=6
default.replication.factor=3
min.insync.replicas=2
# Retention
log.retention.hours=168 # 7 days
log.retention.bytes=-1 # No limit
log.segment.bytes=1073741824 # 1GB segments
# Performance
compression.type=snappy
log.cleanup.policy=delete
# Durability
acks=all
unclean.leader.election.enable=false
# KRaft mode
process.roles=broker,controller
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
Conclusion
Event-driven architecture with Apache Kafka enables building highly scalable, loosely coupled systems that process data in real-time. The patterns covered—idempotent producers, transactional writes, at-least-once consumers, schema evolution, and comprehensive error handling—provide the foundation for production-grade event-driven systems.
Kafka's durability, scalability to millions of events per second, and ability to replay historical data make it ideal for event sourcing, real-time analytics, change data capture, and microservices communication. With KRaft mode eliminating ZooKeeper complexity and the rich ecosystem of connectors and stream processing tools, Kafka is more accessible than ever.
Start with a simple producer-consumer pattern, add schema registry for type safety, implement proper error handling with DLQ, and monitor consumer lag closely. As your system grows, leverage Kafka Streams or ksqlDB for stream processing, and consider multi-datacenter replication for disaster recovery.
Next Steps
- Deploy Kafka cluster using KRaft mode with 3+ brokers for production
- Implement idempotent producer for your first event type
- Set up Schema Registry to enforce event contracts
- Create at-least-once consumer with dead letter queue
- Add monitoring for producer/consumer metrics and lag
- Design event schemas following domain-driven design principles
- Implement retry strategy with exponential backoff and DLQ