Zero Downtime Deployments: A Pipeline That Actually Holds Up

Blue-green deployments, rolling updates, and automated rollback. A working deployment pipeline that lets you ship to production without a maintenance window.

TTharindu PereraSolutions Architect2025-07-22Updated 2026-04-1913 minAdvanced

Zero downtime deployments let you ship code to production without your users noticing. No maintenance windows, no "we'll be back shortly" pages, no 3 AM deploy schedules chosen because that's when traffic is lowest.

What zero downtime deployments are

Zero downtime deployments are deployment strategies that:

  • Keep the service available during application updates and infrastructure changes
  • Remove maintenance windows entirely
  • Make continuous delivery possible through automated deployment pipelines
  • Roll back quickly when something goes wrong
  • Plug into high-availability architectures with load balancing and redundancy

The tradeoff with traditional deployments is obvious. Planned downtime means lost revenue, frustrated users, and a deploy schedule that revolves around traffic patterns instead of engineering productivity.

Why the setup cost is worth it

Zero downtime deployments take real infrastructure work upfront. You need health checks, rollback automation, and either a blue-green or rolling update strategy. That investment pays for itself the first time you need to hotfix production at 2 PM on a Tuesday instead of scheduling it for midnight. On AWS, the platform choice drives a lot of the mechanics, which is why my ECS Fargate vs EKS guide is worth reading first.

The practical wins: you deploy when a feature is ready instead of batching releases into weekly windows. Automated health checks catch bad deploys in seconds instead of waiting for user complaints. And the team stops treating deployments as high-stress events, which means more frequent deploys, smaller changes, and fewer things breaking.

Building the pipeline

Walking through a working deployment pipeline, step by step.

Step 1: blue-green deployment

A blue-green deployment system with automated switching:

import boto3
import time
import requests
from kubernetes import client, config
import docker
import yaml

class BlueGreenDeployment:
    def __init__(self):
        self.ecs_client = boto3.client('ecs')
        self.elb_client = boto3.client('elbv2')
        self.docker_client = docker.from_env()
        self.k8s_client = client.AppsV1Api()
        
    def build_and_push_image(self, app_name, version, dockerfile_path):
        """Build and push Docker image for deployment"""
        image_name = f"{app_name}:{version}"
        
        # Build Docker image
        image, build_logs = self.docker_client.images.build(
            path=dockerfile_path,
            tag=image_name,
            rm=True
        )
        
        # Push to registry
        self.docker_client.images.push(image_name)
        
        return image_name
    
    def create_green_environment(self, app_name, version, config):
        """Create green environment for new deployment"""
        green_config = config.copy()
        green_config['environment'] = 'green'
        green_config['version'] = version
        
        # Create ECS service for green environment
        if config['platform'] == 'ecs':
            return self.create_ecs_green_service(app_name, version, green_config)
        elif config['platform'] == 'kubernetes':
            return self.create_k8s_green_deployment(app_name, version, green_config)
    
    def create_ecs_green_service(self, app_name, version, config):
        """Create ECS service for green environment"""
        service_name = f"{app_name}-green-{version}"
        
        # Create task definition
        task_definition = {
            'family': service_name,
            'networkMode': 'awsvpc',
            'requiresCompatibilities': ['FARGATE'],
            'cpu': config['cpu'],
            'memory': config['memory'],
            'executionRoleArn': config['execution_role_arn'],
            'containerDefinitions': [{
                'name': app_name,
                'image': f"{config['registry']}/{app_name}:{version}",
                'portMappings': [{
                    'containerPort': config['port'],
                    'protocol': 'tcp'
                }],
                'environment': config.get('environment_variables', []),
                'logConfiguration': {
                    'logDriver': 'awslogs',
                    'options': {
                        'awslogs-group': f"/ecs/{service_name}",
                        'awslogs-region': config['region'],
                        'awslogs-stream-prefix': 'ecs'
                    }
                }
            }]
        }
        
        # Register task definition
        response = self.ecs_client.register_task_definition(**task_definition)
        task_definition_arn = response['taskDefinition']['taskDefinitionArn']
        
        # Create ECS service
        service_response = self.ecs_client.create_service(
            cluster=config['cluster'],
            serviceName=service_name,
            taskDefinition=task_definition_arn,
            desiredCount=config['desired_count'],
            launchType='FARGATE',
            networkConfiguration={
                'awsvpcConfiguration': {
                    'subnets': config['subnets'],
                    'securityGroups': config['security_groups'],
                    'assignPublicIp': 'ENABLED'
                }
            }
        )
        
        return service_response['service']['serviceArn']
    
    def create_k8s_green_deployment(self, app_name, version, config):
        """Create Kubernetes deployment for green environment"""
        deployment_name = f"{app_name}-green-{version}"
        
        deployment_manifest = {
            'apiVersion': 'apps/v1',
            'kind': 'Deployment',
            'metadata': {
                'name': deployment_name,
                'labels': {
                    'app': app_name,
                    'version': version,
                    'environment': 'green'
                }
            },
            'spec': {
                'replicas': config['replicas'],
                'selector': {
                    'matchLabels': {
                        'app': app_name,
                        'version': version,
                        'environment': 'green'
                    }
                },
                'template': {
                    'metadata': {
                        'labels': {
                            'app': app_name,
                            'version': version,
                            'environment': 'green'
                        }
                    },
                    'spec': {
                        'containers': [{
                            'name': app_name,
                            'image': f"{config['registry']}/{app_name}:{version}",
                            'ports': [{'containerPort': config['port']}],
                            'env': config.get('environment_variables', []),
                            'resources': {
                                'requests': {
                                    'memory': config['memory'],
                                    'cpu': config['cpu']
                                },
                                'limits': {
                                    'memory': config['memory'],
                                    'cpu': config['cpu']
                                }
                            },
                            'livenessProbe': {
                                'httpGet': {
                                    'path': '/health',
                                    'port': config['port']
                                },
                                'initialDelaySeconds': 30,
                                'periodSeconds': 10
                            },
                            'readinessProbe': {
                                'httpGet': {
                                    'path': '/ready',
                                    'port': config['port']
                                },
                                'initialDelaySeconds': 5,
                                'periodSeconds': 5
                            }
                        }]
                    }
                }
            }
        }
        
        # Create deployment
        response = self.k8s_client.create_namespaced_deployment(
            namespace=config['namespace'],
            body=deployment_manifest
        )
        
        return response.metadata.name

Step 2: health checks and validation

Comprehensive health checking and validation before any traffic shifts:

import asyncio
import aiohttp
from prometheus_client import Counter, Histogram, Gauge
import time

class DeploymentHealthChecker:
    def __init__(self):
        self.health_check_counter = Counter('health_checks_total', 'Total health checks', ['status'])
        self.health_check_duration = Histogram('health_check_duration_seconds', 'Health check duration')
        self.service_health_gauge = Gauge('service_health_status', 'Service health status', ['service', 'version'])
        
    async def validate_green_environment(self, app_name, version, config):
        """Validate green environment before switching traffic"""
        validation_results = {
            'health_checks': await self.run_health_checks(app_name, version, config),
            'performance_tests': await self.run_performance_tests(app_name, version, config),
            'smoke_tests': await self.run_smoke_tests(app_name, version, config)
        }
        
        # All validations must pass
        all_passed = all(
            result['status'] == 'passed' 
            for result in validation_results.values()
        )
        
        return {
            'overall_status': 'passed' if all_passed else 'failed',
            'results': validation_results
        }
    
    async def run_health_checks(self, app_name, version, config):
        """Run comprehensive health checks"""
        health_endpoints = [
            '/health',
            '/ready',
            '/metrics',
            '/api/status'
        ]
        
        results = []
        for endpoint in health_endpoints:
            try:
                start_time = time.time()
                async with aiohttp.ClientSession() as session:
                    url = f"http://{app_name}-green-{version}.{config['domain']}{endpoint}"
                    async with session.get(url, timeout=10) as response:
                        duration = time.time() - start_time
                        self.health_check_duration.observe(duration)
                        
                        if response.status == 200:
                            self.health_check_counter.labels(status='success').inc()
                            results.append({
                                'endpoint': endpoint,
                                'status': 'passed',
                                'response_time': duration,
                                'status_code': response.status
                            })
                        else:
                            self.health_check_counter.labels(status='failed').inc()
                            results.append({
                                'endpoint': endpoint,
                                'status': 'failed',
                                'response_time': duration,
                                'status_code': response.status
                            })
            except Exception as e:
                self.health_check_counter.labels(status='error').inc()
                results.append({
                    'endpoint': endpoint,
                    'status': 'error',
                    'error': str(e)
                })
        
        overall_status = 'passed' if all(r['status'] == 'passed' for r in results) else 'failed'
        return {'status': overall_status, 'checks': results}
    
    async def run_performance_tests(self, app_name, version, config):
        """Run basic performance tests"""
        test_scenarios = [
            {'endpoint': '/api/users', 'concurrent_requests': 10, 'duration': 30},
            {'endpoint': '/api/products', 'concurrent_requests': 20, 'duration': 30},
            {'endpoint': '/api/orders', 'concurrent_requests': 5, 'duration': 30}
        ]
        
        results = []
        for scenario in test_scenarios:
            try:
                performance_result = await self.run_load_test(
                    app_name, version, config, scenario
                )
                results.append(performance_result)
            except Exception as e:
                results.append({
                    'scenario': scenario['endpoint'],
                    'status': 'error',
                    'error': str(e)
                })
        
        overall_status = 'passed' if all(r['status'] == 'passed' for r in results) else 'failed'
        return {'status': overall_status, 'tests': results}
    
    async def run_load_test(self, app_name, version, config, scenario):
        """Run load test for specific scenario"""
        url = f"http://{app_name}-green-{version}.{config['domain']}{scenario['endpoint']}"
        concurrent_requests = scenario['concurrent_requests']
        duration = scenario['duration']
        
        start_time = time.time()
        successful_requests = 0
        failed_requests = 0
        response_times = []
        
        async def make_request():
            nonlocal successful_requests, failed_requests
            try:
                async with aiohttp.ClientSession() as session:
                    request_start = time.time()
                    async with session.get(url, timeout=10) as response:
                        request_duration = time.time() - request_start
                        response_times.append(request_duration)
                        
                        if response.status == 200:
                            successful_requests += 1
                        else:
                            failed_requests += 1
            except Exception:
                failed_requests += 1
        
        # Run concurrent requests for specified duration
        while time.time() - start_time < duration:
            tasks = [make_request() for _ in range(concurrent_requests)]
            await asyncio.gather(*tasks, return_exceptions=True)
            await asyncio.sleep(1)  # Brief pause between batches
        
        # Calculate metrics
        total_requests = successful_requests + failed_requests
        success_rate = (successful_requests / total_requests * 100) if total_requests > 0 else 0
        avg_response_time = sum(response_times) / len(response_times) if response_times else 0
        
        # Determine if test passed
        test_passed = (
            success_rate >= 95 and  # 95% success rate
            avg_response_time < 2.0 and  # Under 2 seconds average response time
            total_requests >= concurrent_requests * duration / 10  # Minimum request volume
        )
        
        return {
            'scenario': scenario['endpoint'],
            'status': 'passed' if test_passed else 'failed',
            'success_rate': success_rate,
            'avg_response_time': avg_response_time,
            'total_requests': total_requests
        }

Step 3: traffic switching and rollback

Automated traffic switching with instant rollback when something breaks:

class TrafficSwitcher:
    def __init__(self):
        self.elb_client = boto3.client('elbv2')
        self.route53_client = boto3.client('route53')
        
    def switch_traffic_to_green(self, app_name, version, config):
        """Switch traffic from blue to green environment"""
        try:
            # Update load balancer target groups
            if config['platform'] == 'ecs':
                self.update_ecs_target_group(app_name, version, config)
            elif config['platform'] == 'kubernetes':
                self.update_k8s_service(app_name, version, config)
            
            # Update DNS if using Route53
            if config.get('use_route53'):
                self.update_route53_records(app_name, version, config)
            
            return {'status': 'success', 'message': 'Traffic switched to green environment'}
            
        except Exception as e:
            return {'status': 'error', 'message': f'Failed to switch traffic: {str(e)}'}
    
    def rollback_to_blue(self, app_name, config):
        """Rollback to blue environment"""
        try:
            # Switch traffic back to blue
            if config['platform'] == 'ecs':
                self.update_ecs_target_group(app_name, 'blue', config)
            elif config['platform'] == 'kubernetes':
                self.update_k8s_service(app_name, 'blue', config)
            
            # Update DNS back to blue
            if config.get('use_route53'):
                self.update_route53_records(app_name, 'blue', config)
            
            # Clean up green environment
            self.cleanup_green_environment(app_name, config)
            
            return {'status': 'success', 'message': 'Rolled back to blue environment'}
            
        except Exception as e:
            return {'status': 'error', 'message': f'Rollback failed: {str(e)}'}
    
    def update_ecs_target_group(self, app_name, version, config):
        """Update ECS target group for traffic switching"""
        target_group_arn = config['target_group_arn']
        
        # Get current targets
        current_targets = self.elb_client.describe_target_health(
            TargetGroupArn=target_group_arn
        )['TargetHealthDescriptions']
        
        # Remove current targets
        if current_targets:
            self.elb_client.deregister_targets(
                TargetGroupArn=target_group_arn,
                Targets=[{'Id': target['Target']['Id']} for target in current_targets]
            )
        
        # Add new targets (green environment)
        if version == 'blue':
            service_name = f"{app_name}-blue"
        else:
            service_name = f"{app_name}-green-{version}"
        
        # Get service tasks
        tasks = self.ecs_client.list_tasks(
            cluster=config['cluster'],
            serviceName=service_name
        )['taskArns']
        
        # Register new targets
        if tasks:
            task_details = self.ecs_client.describe_tasks(
                cluster=config['cluster'],
                tasks=tasks
            )['tasks']
            
            new_targets = []
            for task in task_details:
                for attachment in task['attachments']:
                    for detail in attachment['details']:
                        if detail['name'] == 'networkInterfaceId':
                            new_targets.append({
                                'Id': detail['value'],
                                'Port': config['port']
                            })
            
            if new_targets:
                self.elb_client.register_targets(
                    TargetGroupArn=target_group_arn,
                    Targets=new_targets
                )

Patterns beyond blue-green

1. Canary deployments

Gradual traffic shifting for safer deployments:

class CanaryDeployment:
    def __init__(self):
        self.traffic_percentages = [5, 10, 25, 50, 75, 100]
        
    def deploy_canary(self, app_name, version, config):
        """Deploy using canary strategy with gradual traffic increase"""
        for percentage in self.traffic_percentages:
            # Switch percentage of traffic to canary
            self.switch_canary_traffic(app_name, version, percentage, config)
            
            # Wait and monitor
            time.sleep(300)  # 5 minutes
            
            # Check metrics
            if not self.check_canary_health(app_name, version, config):
                # Rollback if issues detected
                self.rollback_canary(app_name, config)
                return {'status': 'failed', 'message': 'Canary deployment failed'}
        
        # Full deployment successful
        return {'status': 'success', 'message': 'Canary deployment completed'}

2. Rolling updates

Rolling updates for Kubernetes deployments:

class RollingUpdateDeployment:
    def __init__(self):
        self.k8s_client = client.AppsV1Api()
        
    def rolling_update(self, app_name, version, config):
        """Perform rolling update deployment"""
        deployment_name = f"{app_name}-blue"
        
        # Update deployment with new image
        deployment = self.k8s_client.read_namespaced_deployment(
            name=deployment_name,
            namespace=config['namespace']
        )
        
        # Update container image
        deployment.spec.template.spec.containers[0].image = f"{config['registry']}/{app_name}:{version}"
        
        # Configure rolling update strategy
        deployment.spec.strategy = client.V1DeploymentStrategy(
            type='RollingUpdate',
            rolling_update=client.V1RollingUpdateDeployment(
                max_unavailable=1,
                max_surge=1
            )
        )
        
        # Apply update
        self.k8s_client.patch_namespaced_deployment(
            name=deployment_name,
            namespace=config['namespace'],
            body=deployment
        )
        
        # Wait for rollout to complete
        self.wait_for_rollout(deployment_name, config['namespace'])

Patterns worth keeping

1. Real testing, not just a green checkmark

Run automated tests at unit, integration, and performance levels before any deploy. A green CI badge from a single unit test suite is not the same thing as a deployable build.

2. Health monitoring that catches problems fast

Set up health checks and monitoring that flag issues in seconds, not minutes.

3. Gradual rollouts

Use canary or rolling deployments to validate changes incrementally instead of betting on a single switch.

4. Automated rollback

Wire rollback triggers to health metrics and error rates so the system reverts before a human can decide.

5. Database migrations done carefully

Plan schema changes with backward compatibility and a rollback path. The hardest production incidents in this category are almost always database-shaped.

6. Feature flags

Decouple deploys from releases. Ship code dark, flip the flag, and roll back the flag without rolling back the deploy.

7. Runbooks people can find at 3 AM

Maintain clear documentation of deployment procedures and rollback steps. The runbook nobody can find when something breaks is the runbook that doesn't exist.

Deployment considerations

1. Infrastructure capacity

Make sure there's enough capacity to run both blue and green environments at the same time. Blue-green is double the footprint during the switch.

2. Database compatibility

Plan migrations carefully. Backward-compatible schema changes are the price of zero-downtime deploys.

3. Monitoring and alerting

Monitor heavily during and after the deploy. The window between switch and stabilization is where most regressions show up.

4. Team training

Train the team on deployment procedures and emergency response. The first rollback shouldn't be the first time someone runs the rollback script.

Where this actually matters

Zero downtime deployments show up most in:

  • E-commerce platforms, where a bad deploy during Black Friday traffic spikes compounds into real revenue loss
  • Financial services, where banking and payment systems need 24/7 availability
  • SaaS applications, where customers expect continuous availability as part of the contract
  • Gaming platforms, where players don't tolerate maintenance windows
  • Healthcare systems, where the system has to stay up because lives depend on it

Wrapping up

Building zero downtime deployment pipelines takes planning, real testing, and infrastructure that can support the pattern. The strategies above produce deployment systems that hold availability while letting you ship fast.

The whole thing rests on automation, monitoring, and rollback procedures that have actually been tested. Untested rollback automation is just a story.

A reasonable next move

  1. Implement the basic blue-green deployment using the code examples above
  2. Set up health checking and monitoring before you trust any automation
  3. Build automated rollback procedures and test them on purpose
  4. Run the whole pipeline in staging until it's boring

Start with blue-green, add the rest as the system matures.

Topics Covered

Zero Downtime DeploymentsProduction DeploymentContinuous DeliveryBlue Green DeploymentRolling UpdatesDeployment Pipeline

About the author

T

Tharindu Perera

Tharindu Perera is a software engineer and solutions architect. He writes Refactix to share patterns from production work across AWS and AI engineering.

Follow RefactixLinkedIn·Facebook

Share this article

You Might Also Like

More from Refactix

Browse the full library of guides and tutorials on AWS and AI engineering.

Explore All Guides
Subscribe

New articles, straight to your inbox

I publish new guides on AWS architecture and AI engineering. Subscribe to get each one when it lands.

No spam, unsubscribe anytimePractical AWS & AI insights