Application Lifecycle
Reliable initialization and shutdown for production services
Reference implementations: OCapistaine (FastAPI + APScheduler), Vaettir (Docker + n8n)
Table of Contents
- Lifecycle Stages
- Startup Sequence
- Running State
- Shutdown Sequence
- Health Checks
- Process Management
- Examples
- Troubleshooting
Lifecycle Stages
┌──────────────────────────────────────────────────────────┐
│ Lifecycle States │
├──────────────────────────────────────────────────────────┤
│ │
│ [INIT] → [STARTING] → [READY] → [RUNNING] → [SHUTDOWN] │
│ ↓ ↓ ↓ ↓ ↓ │
│ [ERROR] [FAILED] │
│ │
└──────────────────────────────────────────────────────────┘
Legend:
- INIT: Process started, before any resource initialization
- STARTING: Initializing resources, not yet ready
- READY: Resources initialized, accepting requests
- RUNNING: Operating normally, processing requests
- SHUTDOWN: Graceful shutdown initiated
- ERROR: Recoverable error, may retry
- FAILED: Fatal error, process will exit
Startup Sequence
Phase 1: Environment Loading
What: Load configuration from environment
Timing: Immediate (< 1s)
Key actions:
- Load
.envfile - Validate required variables
- Set defaults for optional variables
Code:
import os
from dotenv import load_dotenv
# Load environment
load_dotenv()
# Validate required
required_vars = ["REDIS_HOST", "REDIS_PORT"]
for var in required_vars:
if not os.getenv(var):
raise ValueError(f"Missing required variable: {var}")
print(f"Environment loaded: {os.getenv('ENVIRONMENT', 'development')}")
Phase 2: Connection Initialization
What: Connect to external services
Timing: 1-5 seconds
Services:
- Redis (cache, session)
- Database (PostgreSQL, MongoDB)
- Message queues (RabbitMQ, Kafka)
- External APIs (health check only)
Code:
import aioredis
async def init_connections():
# Redis
redis = await aioredis.create_redis_pool(
f'redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}'
)
if redis:
logger.info("✓ Redis connected")
else:
logger.warning("✗ Redis unavailable (optional)")
# Database
try:
db = await asyncpg.create_pool(os.getenv("DATABASE_URL"))
logger.info("✓ Database connected")
except Exception as e:
logger.error(f"✗ Database connection failed: {e}")
raise
Phase 3: Provider Validation
What: Test LLM providers
Timing: 2-10 seconds (depends on provider)
Actions:
- Test primary provider (quick timeout)
- Verify fallback chain available
- Cache provider state
Code:
from app.providers import get_provider
async def init_providers():
logger.info("Validating LLM providers...")
provider = get_provider("claude")
try:
# Quick test call (timeout: 5s)
response = await asyncio.wait_for(
provider.complete(
messages=[Message(role="user", content="test")],
timeout=5
),
timeout=5
)
logger.info("✓ Claude provider ready")
except asyncio.TimeoutError:
logger.warning("⊘ Claude provider timeout, will use fallback")
except Exception as e:
logger.warning(f"⊘ Claude provider unavailable: {e}")
Phase 4: Scheduler Startup
What: Start APScheduler and register jobs
Timing: < 1 second
Actions:
- Create AsyncIOScheduler
- Register all scheduled jobs
- Start scheduler
- Verify jobs registered
Code:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def start_scheduler():
logger.info("Starting scheduler...")
scheduler = AsyncIOScheduler(
job_defaults={
"coalesce": True,
"max_instances": 2,
"misfire_grace_time": 300,
}
)
# Register jobs
scheduler.add_job(
task_prompt_sync,
CronTrigger(hour=0, minute=0),
id="task_prompt_sync"
)
logger.debug("✓ Registered task_prompt_sync")
scheduler.add_job(
task_audierne_docs,
CronTrigger(minute=20, hour="*/2"),
id="task_audierne_docs"
)
logger.debug("✓ Registered task_audierne_docs")
scheduler.start()
logger.info(f"✓ Scheduler started with {len(scheduler.get_jobs())} jobs")
return scheduler
Phase 5: Server Listening
What: Start HTTP server and listen for requests
Timing: < 1 second
Code:
# For Uvicorn (automatic via lifespan)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
lifespan="auto" # Calls lifespan() automatically
)
Complete Startup Example
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
FastAPI lifespan context manager.
Handles startup and shutdown.
"""
logger = get_logger("presentation")
# ============================================================================
# STARTUP (before yield)
# ============================================================================
logger.info("=" * 80)
logger.info("APPLICATION STARTUP")
logger.info("=" * 80)
try:
# Phase 1: Environment
logger.info("[1/5] Loading environment...")
from app.providers.config import load_config
config = load_config()
logger.info(f"✓ Environment: {config.environment}")
# Phase 2: Connections
logger.info("[2/5] Initializing connections...")
from app.data.redis_client import health_check as redis_health
if redis_health():
logger.info("✓ Redis connected")
else:
logger.warning("⊘ Redis unavailable (will retry)")
# Phase 3: Providers
logger.info("[3/5] Validating providers...")
provider = get_provider()
logger.info(f"✓ Provider: {provider.__class__.__name__}")
# Phase 4: Scheduler
logger.info("[4/5] Starting scheduler...")
from app.services.scheduler import start_scheduler
scheduler = await start_scheduler()
logger.info(f"✓ Scheduler started with {len(scheduler.get_jobs())} jobs")
# Phase 5: Listening
logger.info("[5/5] Server listening...")
logger.info("✓ Application ready")
logger.info("=" * 80)
except Exception as e:
logger.critical(f"✗ Startup failed: {e}", exc_info=True)
raise
yield # Application runs here
# ============================================================================
# SHUTDOWN (after yield)
# ============================================================================
logger.info("=" * 80)
logger.info("APPLICATION SHUTDOWN")
logger.info("=" * 80)
try:
# Stop scheduler
logger.info("Stopping scheduler...")
from app.services.scheduler import stop_scheduler
await stop_scheduler()
logger.info("✓ Scheduler stopped")
# Close connections
logger.info("Closing connections...")
# redis.close(), db.close(), etc.
logger.info("✓ Connections closed")
logger.info("✓ Shutdown complete")
logger.info("=" * 80)
except Exception as e:
logger.error(f"✗ Shutdown error: {e}", exc_info=True)
app = FastAPI(lifespan=lifespan)
Running State
Accepting Requests
Once running, the application:
- Listens on configured port (default: 8000)
- Handles concurrent requests via event loop
- Executes scheduled tasks at configured times
- Maintains connections to external services
- Logs activities to domain-specific logs
Request Handling
Request arrives
│
▼
Middleware processing
│
├─ CORS
├─ Logging
├─ Authentication
└─ Rate limiting
│
▼
Route handler
│
├─ Call agent (if needed)
├─ Access database
├─ Call external APIs
└─ Generate response
│
▼
Response sent
Shutdown Sequence
Signal Handling
Uvicorn handles these signals:
SIGTERM (graceful shutdown)
SIGINT (Ctrl+C)
│
├─→ Stop accepting new connections
├─→ Wait for running requests (timeout: 30s)
├─→ Call lifespan shutdown handler
└─→ Exit process
Phase 1: Stop Accepting Requests
Timing: Immediate
What: Close server socket, reject new connections
Client tries to connect
│
▼
Connection refused
(Server shutting down)
Phase 2: Wait for Running Tasks
Timing: 0-30 seconds (depends on tasks)
What: Wait for all running requests/tasks to complete
Timeout: 30 seconds (configurable via timeout parameter)
Key consideration: Long-running tasks may not complete
# Graceful shutdown configuration
task_timeout = 30 # seconds
# If task running > 30s, force kill
asyncio.wait_for(task, timeout=task_timeout)
Phase 3: Scheduler Shutdown
Timing: < 1 second
What: Stop APScheduler
async def stop_scheduler():
if scheduler and scheduler.running:
logger.info("Stopping scheduler...")
scheduler.shutdown(wait=True) # Wait for running jobs
logger.info("✓ Scheduler stopped")
Phase 4: Resource Cleanup
Timing: < 1 second
What: Close connections, cleanup resources
# Close connections
async def cleanup_resources():
logger.info("Cleaning up resources...")
# Redis
if redis:
redis.close()
await redis.wait_closed()
# Database
if db_pool:
await db_pool.close()
# Temporary files
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
logger.info("✓ Resources cleaned")
Complete Shutdown Example
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup...
await start_scheduler()
yield
# Shutdown
logger.info("Shutdown signal received")
try:
# 1. Stop accepting requests (handled by Uvicorn)
logger.info("Stopping request handler...")
# 2. Wait for running tasks
logger.info("Waiting for tasks to complete (timeout: 30s)...")
from app.services.scheduler import stop_scheduler
await asyncio.wait_for(
stop_scheduler(),
timeout=30
)
# 3. Close connections
logger.info("Closing connections...")
redis.close()
db.close()
# 4. Cleanup
logger.info("Cleanup complete")
except asyncio.TimeoutError:
logger.warning("Tasks didn't complete within timeout, force stopping")
except Exception as e:
logger.error(f"Shutdown error: {e}", exc_info=True)
Health Checks
Startup Health Check
Check if service initialized correctly:
# After starting, verify with health endpoint
curl http://localhost:8000/health
# {"status": "healthy"}
Readiness Probe (Kubernetes)
Returns 200 when ready to accept traffic:
@app.get("/health/ready")
async def readiness():
"""Readiness probe for Kubernetes."""
checks = {
"redis": redis.ping() if redis else False,
"provider": await test_provider(),
"scheduler": scheduler and scheduler.running,
}
all_ready = all(checks.values())
return {
"status": "ready" if all_ready else "not_ready",
"checks": checks
}, 200 if all_ready else 503
Liveness Probe (Kubernetes)
Returns 200 if process is alive:
@app.get("/health/live")
async def liveness():
"""Liveness probe for Kubernetes."""
return {"status": "alive"}, 200
Process Management
Getting Process Info
# Get PID
ps aux | grep uvicorn | grep -v grep
# Get port info
lsof -i :8000
# Monitor process
watch 'ps aux | grep uvicorn'
Graceful Restart
# 1. Get old PID
OLD_PID=$(lsof -t -i :8000)
# 2. Start new process (different port temporarily)
PORT=8001 poetry run uvicorn app.main:app &
NEW_PID=$!
# 3. Wait for new process ready
sleep 5
curl http://localhost:8001/health
# 4. Switch traffic (in load balancer)
# 5. Stop old process
kill -15 $OLD_PID
sleep 10
kill -9 $OLD_PID
Examples
Example 1: OCapistaine Lifecycle
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
logger = get_logger("presentation")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""OCapistaine lifecycle."""
# Startup
logger.info("OCapistaine starting...")
from app.data.redis_client import health_check as redis_check
if redis_check():
logger.info("✓ Redis connected")
else:
logger.warning("⊘ Redis unavailable")
from app.services.scheduler import start_scheduler
await start_scheduler()
yield
# Shutdown
logger.info("OCapistaine shutting down...")
from app.services.scheduler import stop_scheduler
await stop_scheduler()
logger.info("✓ Shutdown complete")
app = FastAPI(lifespan=lifespan)
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", port=8000)
Example 2: Docker Lifecycle
FROM python:3.12
WORKDIR /app
# Install
COPY pyproject.toml poetry.lock ./
RUN poetry install
# Copy app
COPY . .
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
# Run
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Example 3: Kubernetes Lifecycle
apiVersion: apps/v1
kind: Deployment
metadata:
name: ocapistaine-api
spec:
replicas: 3
template:
spec:
containers:
- name: api
image: ocapistaine:latest
ports:
- containerPort: 8000
# Startup probe: wait for app ready
startupProbe:
httpGet:
path: /health/ready
port: 8000
failureThreshold: 30
periodSeconds: 10
# Readiness probe: can accept traffic
readinessProbe:
httpGet:
path: /health/ready
port: 8000
periodSeconds: 10
# Liveness probe: is process alive
livenessProbe:
httpGet:
path: /health/live
port: 8000
periodSeconds: 30
# Graceful shutdown
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 15"] # Wait for load balancer drain
Troubleshooting
Startup Hangs
Symptom: Process starts but never reaches "ready" state
Causes:
- Waiting for external service (Redis, DB) that's not running
- Long-running initialization (e.g., large model loading)
- Deadlock in startup code
Fix:
# Check what's running
lsof -p <PID>
# Check logs
tail -f logs/presentation.log
# Add startup timeout
timeout 60 poetry run uvicorn app.main:app
Startup Fails Silently
Symptom: Process exits immediately with no error
Cause: Unhandled exception in startup code
Fix:
# Add comprehensive error handling
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
# startup...
logger.info("Startup complete")
except Exception as e:
logger.critical(f"Startup failed: {e}", exc_info=True)
raise # Re-raise so process exits with error
yield
try:
# shutdown...
except Exception as e:
logger.error(f"Shutdown error: {e}", exc_info=True)
Shutdown Takes Too Long
Symptom: Takes 30+ seconds to stop
Cause: Running tasks not completing gracefully
Fix:
# Reduce timeout
await asyncio.wait_for(stop_scheduler(), timeout=10)
# Or check what's running
logger.info(f"Running tasks: {asyncio.all_tasks()}")
for task in asyncio.all_tasks():
logger.info(f" - {task.get_name()}")
Checklist
Application lifecycle should:
- Load env: .env file loaded, required vars validated
- Init connections: Redis, DB, APIs tested
- Validate providers: LLM providers working or fallback available
- Start scheduler: APScheduler running, jobs registered
- Report ready: /health endpoint responds 200
- Accept requests: API endpoints functional
- Log activities: Domain loggers operational
- Handle signals: SIGTERM/SIGINT handled gracefully
- Stop scheduler: APScheduler shutdown cleanly
- Close connections: All resources released
- Exit cleanly: Process exits with code 0
References
- FastAPI Lifespan: https://fastapi.tiangolo.com/advanced/events/
- Uvicorn: https://www.uvicorn.org
- Kubernetes Probes: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
- ASGI Spec: https://asgi.readthedocs.io
Last Updated: 2026-02-22 Branch: valkyria Reference Implementation: OCapistaine