Logging Integration
Observable async services with structured logging and correlation
Reference implementations: OCapistaine (domain loggers), Vaettir (task logging), APScheduler coordination
Table of Contents
- Architecture
- Domain-Based Logging
- Structured Logging
- APScheduler Integration
- Correlation IDs
- Log Rotation
- Observability Stack
- Examples
Architecture
Three-Layer Logging System
┌─────────────────────────────────────────────────────────┐
│ Application Code │
│ (agents, services, tasks, endpoints) │
└────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Domain-Based Loggers │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ agents │ │ services │ │ scheduler │ │
│ │ logger │ │ logger │ │ logger │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ ▼ │
│ Structured Context (correlation IDs) │
└────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Output Handlers │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Console │ │ File Rotation│ │ Remote Sink │ │
│ │ (stderr) │ │ (logs/) │ │ (Opik, etc.) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Observability Platform │
│ (Opik, CloudWatch, ELK, etc.) │
└─────────────────────────────────────────────────────────┘
Domain-Based Logging
Concept
Instead of a single "application" logger, create domain-specific loggers for different subsystems:
# ✅ Good: Domain-specific loggers
logger_agents = get_logger("agents") # AI agents
logger_services = get_logger("services") # Business logic
logger_scheduler = get_logger("scheduler") # Task orchestration
logger_providers = get_logger("providers") # LLM APIs
logger_mockup = get_logger("mockup") # Testing framework
# ❌ Avoid: Single logger
logger = logging.getLogger("app") # Too generic
Standard Domains
| Domain | Purpose | Examples |
|---|---|---|
| agents | AI agent activity | Forseti execution, feature results |
| services | Business logic | API endpoints, data processing |
| scheduler | Task orchestration | APScheduler jobs, cron triggers |
| providers | LLM API calls | Ollama, OpenAI, Claude requests |
| mockup | Testing framework | Contribution generation, mutations |
| adapters | External integrations | Vaettir webhooks, N8N workflows |
| data | Data access | Redis, database operations |
| presentation | UI events | Streamlit interactions |
OCapistaine Implementation
Location: app/providers/logging.py
import logging
import os
from pathlib import Path
from logging.handlers import RotatingFileHandler
class DomainLogger:
"""Logger for a specific domain."""
def __init__(self, domain: str, log_dir: str = "logs"):
self.domain = domain
self.logger = logging.getLogger(domain)
# Configure if not already configured
if not self.logger.handlers:
self.logger.setLevel(logging.DEBUG)
# File handler (rotating)
log_file = Path(log_dir) / f"{domain}.log"
Path(log_dir).mkdir(exist_ok=True)
handler = RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5
)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# Console handler (stderr)
console = logging.StreamHandler()
console.setFormatter(formatter)
self.logger.addHandler(console)
def debug(self, msg: str, **kwargs):
self.logger.debug(msg, extra={"context": kwargs})
def info(self, msg: str, **kwargs):
self.logger.info(msg, extra={"context": kwargs})
def warning(self, msg: str, **kwargs):
self.logger.warning(msg, extra={"context": kwargs})
def error(self, msg: str, **kwargs):
self.logger.error(msg, extra={"context": kwargs})
def critical(self, msg: str, **kwargs):
self.logger.critical(msg, extra={"context": kwargs})
def get_logger(domain: str) -> DomainLogger:
"""Get logger for domain."""
return DomainLogger(domain)
Structured Logging
Passing Context
Always pass structured context using extra or kwargs:
# ✅ Good: Structured context
logger.info(
"Contribution validated",
extra={
"contribution_id": contrib_id,
"status": "valid",
"rules_checked": 5,
"duration_ms": 234
}
)
# ❌ Avoid: String formatting
logger.info(f"Contribution {contrib_id} validated") # Non-parseable
Log Format
Structured JSON output (for parsing):
{
"timestamp": "2026-02-22T14:30:45.123Z",
"domain": "agents",
"level": "INFO",
"message": "Contribution validated",
"context": {
"contribution_id": "abc-123",
"status": "valid",
"rules_checked": 5,
"duration_ms": 234
}
}
Correlation IDs
Track requests across services:
import uuid
from contextvars import ContextVar
# Context variable (thread-safe, async-safe)
correlation_id: ContextVar[str] = ContextVar(
"correlation_id",
default=None
)
def set_correlation_id(cid: str):
"""Set correlation ID for this context."""
correlation_id.set(cid)
def log_with_correlation(logger, level: str, message: str, **kwargs):
"""Log with automatic correlation ID."""
kwargs["correlation_id"] = correlation_id.get() or str(uuid.uuid4())
getattr(logger, level)(message, extra=kwargs)
# Usage
@app.get("/api/contribute")
async def contribute(contribution: str):
cid = str(uuid.uuid4())
set_correlation_id(cid)
logger.info("Request received", extra={"correlation_id": cid})
# Business logic
result = await forseti.execute_feature("validate", contribution=contribution)
logger.info("Validation complete", extra={"correlation_id": cid, "result": result})
return result
APScheduler Integration
Task-Level Logging
Each scheduled task should log:
- Start: Task name, parameters
- Progress: Milestones, status updates
- Completion: Duration, result summary
- Errors: Full exception trace
Pattern:
from app.providers.logging import get_logger
from datetime import datetime
logger = get_logger("scheduler")
async def task_example(task_name: str, date_string: str):
"""Example scheduled task with logging."""
start_time = datetime.now()
try:
logger.info(
"Task started",
task_id=task_name,
date=date_string,
timestamp=start_time.isoformat()
)
# Step 1
logger.debug("Processing step 1")
result1 = await step1()
# Step 2
logger.debug("Processing step 2")
result2 = await step2(result1)
# Success
duration_ms = (datetime.now() - start_time).total_seconds() * 1000
logger.info(
"Task completed",
task_id=task_name,
status="success",
duration_ms=duration_ms,
result_count=len(result2)
)
return {"status": "success", "result": result2}
except Exception as e:
duration_ms = (datetime.now() - start_time).total_seconds() * 1000
logger.error(
"Task failed",
task_id=task_name,
status="failed",
duration_ms=duration_ms,
error=str(e),
exc_info=True # Full traceback
)
raise
Scheduler Lifecycle Logging
async def start_scheduler():
"""Scheduler startup logging."""
logger = get_logger("scheduler")
logger.info("Scheduler initialization")
scheduler = AsyncIOScheduler()
# Register jobs
logger.debug("Registering jobs")
scheduler.add_job(
task_audierne_docs,
CronTrigger(minute=20, hour="*/2"),
id="task_audierne_docs"
)
logger.debug("Job registered", job_id="task_audierne_docs")
scheduler.start()
logger.info("Scheduler started")
return scheduler
async def stop_scheduler(scheduler):
"""Scheduler shutdown logging."""
logger = get_logger("scheduler")
logger.info("Scheduler shutdown requested")
# Get running jobs
running = scheduler.get_jobs()
logger.debug(f"Waiting for {len(running)} jobs to complete")
scheduler.shutdown(wait=True)
logger.info("Scheduler stopped")
Log Rotation
Configuration
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
"logs/agents.log",
maxBytes=10 * 1024 * 1024, # 10 MB per file
backupCount=5 # Keep 5 backup files
)
# Results in: agents.log, agents.log.1, agents.log.2, ...
Log Directory Structure
logs/
├── agents.log # AI agents
├── agents.log.1 # Backup from yesterday
├── services.log # Business logic
├── scheduler.log # Task orchestration
├── providers.log # LLM API calls
├── mockup.log # Testing framework
└── errors.log # All errors (symlink or aggregated)
Cleanup Strategy
# Delete logs older than 30 days
find logs/ -name "*.log.*" -mtime +30 -delete
# Compress old logs
find logs/ -name "*.log.[0-9]" -exec gzip {} \;
# Add to cron: runs daily at 2 AM
0 2 * * * find /app/logs -name "*.log.*" -mtime +30 -delete
Observability Stack
Integration with Opik
from app.agents.tracing.opik import OpikTracer
logger = get_logger("agents")
@OpikTracer.track
async def validate_contribution(text: str):
"""Traced and logged."""
contribution_id = uuid.uuid4()
logger.info(
"Validation started",
contribution_id=contribution_id,
text_length=len(text)
)
result = await forseti.execute_feature("validate", contribution=text)
logger.info(
"Validation complete",
contribution_id=contribution_id,
valid=result["valid"]
)
# Opik traces this call automatically
return result
Log Aggregation
For production, forward logs to centralized service:
# Example: CloudWatch integration
import watchtower
cloudwatch_handler = watchtower.CloudWatchLogHandler(
log_group="ocapistaine",
stream_name="api-server"
)
logger.addHandler(cloudwatch_handler)
Examples
Example 1: Agent Logging
# app/agents/forseti/agent.py
from app.providers.logging import get_logger
logger = get_logger("agents")
class Forseti(BaseAgent):
async def validate(self, contribution: str) -> dict:
logger.info(
"Validation started",
contribution_length=len(contribution)
)
try:
result = await self.execute_feature(
"validate",
contribution=contribution
)
logger.info(
"Validation complete",
valid=result["valid"],
duration_ms=(time.time() - start) * 1000
)
return result
except Exception as e:
logger.error(
"Validation failed",
error=str(e),
exc_info=True
)
raise
Example 2: Task Logging
# app/services/tasks/task_audierne_docs.py
from app.providers.logging import get_logger
logger = get_logger("scheduler")
async def task_audierne_docs(date_string: str = None) -> dict:
"""Process audierne documents."""
start = datetime.now()
logger.info(
"Task started",
task="task_audierne_docs",
date=date_string
)
try:
# Get unprocessed documents
docs = get_unprocessed_docs()
logger.debug(f"Found {len(docs)} unprocessed documents")
for doc in docs:
logger.debug(f"Processing: {doc['file']}")
# Extract themes
themes = await extract_themes(doc['content'])
logger.debug(
f"Extracted {len(themes)} themes",
file=doc['file']
)
# Create dataset
dataset_id = await create_opik_dataset(doc, themes)
logger.info(
f"Dataset created",
file=doc['file'],
dataset_id=dataset_id
)
duration_ms = (datetime.now() - start).total_seconds() * 1000
logger.info(
"Task complete",
status="success",
documents_processed=len(docs),
duration_ms=duration_ms
)
except Exception as e:
logger.error(
"Task failed",
error=str(e),
exc_info=True
)
raise
Example 3: API Endpoint Logging
# app/main.py
from app.providers.logging import get_logger
logger = get_logger("presentation")
@app.post("/api/v1/contributions")
async def create_contribution(request: Request):
"""API endpoint with logging."""
contribution_id = str(uuid.uuid4())
logger.info(
"Request received",
endpoint="/api/v1/contributions",
contribution_id=contribution_id,
user_ip=request.client.host
)
try:
body = await request.json()
text = body.get("text", "")
logger.debug(
"Validating contribution",
contribution_id=contribution_id,
text_length=len(text)
)
# Validate
result = await forseti.validate(text)
logger.info(
"Validation complete",
contribution_id=contribution_id,
valid=result["valid"]
)
return {
"id": contribution_id,
"valid": result["valid"],
"reason": result.get("reason")
}
except Exception as e:
logger.error(
"Request failed",
contribution_id=contribution_id,
error=str(e),
exc_info=True
)
raise
Best Practices
✅ DO
- Use domain loggers: One logger per subsystem
- Structured context: Always pass structured kwargs
- Correlation IDs: Track requests across services
- Log levels appropriately: DEBUG, INFO, WARNING, ERROR, CRITICAL
- Log on boundaries: Entry/exit of major operations
- Include duration: Time-consuming operations
- Full tracebacks: Use
exc_info=Trueon errors - Rotate logs: Prevent disk fill-up
❌ DON'T
- Log passwords/secrets: Never log sensitive data
- Log in tight loops: Performance impact
- String formatting in logs: Use structured context
- Bare
except: Log the exception type - Incomplete context: Include identifiers (IDs, names)
- Fire-and-forget: Ensure log handlers complete
Testing
Mock Logger
import logging
from unittest.mock import MagicMock
def test_logging():
# Create mock logger
logger = MagicMock()
# Test code that logs
logger.info("test", extra={"key": "value"})
# Assert logging called
logger.info.assert_called_with("test", extra={"key": "value"})
Capture Logs
import pytest
from logging import DEBUG
def test_with_logs(caplog):
caplog.set_level(DEBUG)
# Code that logs
logger.info("test message")
# Assert
assert "test message" in caplog.text
assert caplog.records[0].levelname == "INFO"
References
- Python Logging: https://docs.python.org/3/library/logging.html
- Structured Logging: https://www.kartar.net/2015/12/structured-logging/
- Correlation IDs: https://www.w3.org/TR/trace-context/
- Opik Integration: https://www.comet.com/docs/opik
Last Updated: 2026-02-22 Branch: valkyria Reference Implementation: OCapistaine