Skip to main content

Logging Integration

Observable async services with structured logging and correlation

Reference implementations: OCapistaine (domain loggers), Vaettir (task logging), APScheduler coordination


Table of Contents

  1. Architecture
  2. Domain-Based Logging
  3. Structured Logging
  4. APScheduler Integration
  5. Correlation IDs
  6. Log Rotation
  7. Observability Stack
  8. Examples

Architecture

Three-Layer Logging System

┌─────────────────────────────────────────────────────────┐
│ Application Code │
│ (agents, services, tasks, endpoints) │
└────────────────────┬────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Domain-Based Loggers │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ agents │ │ services │ │ scheduler │ │
│ │ logger │ │ logger │ │ logger │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ ▼ │
│ Structured Context (correlation IDs) │
└────────────────────┬────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Output Handlers │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Console │ │ File Rotation│ │ Remote Sink │ │
│ │ (stderr) │ │ (logs/) │ │ (Opik, etc.) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────┬────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Observability Platform │
│ (Opik, CloudWatch, ELK, etc.) │
└─────────────────────────────────────────────────────────┘

Domain-Based Logging

Concept

Instead of a single "application" logger, create domain-specific loggers for different subsystems:

# ✅ Good: Domain-specific loggers
logger_agents = get_logger("agents") # AI agents
logger_services = get_logger("services") # Business logic
logger_scheduler = get_logger("scheduler") # Task orchestration
logger_providers = get_logger("providers") # LLM APIs
logger_mockup = get_logger("mockup") # Testing framework

# ❌ Avoid: Single logger
logger = logging.getLogger("app") # Too generic

Standard Domains

DomainPurposeExamples
agentsAI agent activityForseti execution, feature results
servicesBusiness logicAPI endpoints, data processing
schedulerTask orchestrationAPScheduler jobs, cron triggers
providersLLM API callsOllama, OpenAI, Claude requests
mockupTesting frameworkContribution generation, mutations
adaptersExternal integrationsVaettir webhooks, N8N workflows
dataData accessRedis, database operations
presentationUI eventsStreamlit interactions

OCapistaine Implementation

Location: app/providers/logging.py

import logging
import os
from pathlib import Path
from logging.handlers import RotatingFileHandler

class DomainLogger:
"""Logger for a specific domain."""

def __init__(self, domain: str, log_dir: str = "logs"):
self.domain = domain
self.logger = logging.getLogger(domain)

# Configure if not already configured
if not self.logger.handlers:
self.logger.setLevel(logging.DEBUG)

# File handler (rotating)
log_file = Path(log_dir) / f"{domain}.log"
Path(log_dir).mkdir(exist_ok=True)

handler = RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5
)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)

# Console handler (stderr)
console = logging.StreamHandler()
console.setFormatter(formatter)
self.logger.addHandler(console)

def debug(self, msg: str, **kwargs):
self.logger.debug(msg, extra={"context": kwargs})

def info(self, msg: str, **kwargs):
self.logger.info(msg, extra={"context": kwargs})

def warning(self, msg: str, **kwargs):
self.logger.warning(msg, extra={"context": kwargs})

def error(self, msg: str, **kwargs):
self.logger.error(msg, extra={"context": kwargs})

def critical(self, msg: str, **kwargs):
self.logger.critical(msg, extra={"context": kwargs})

def get_logger(domain: str) -> DomainLogger:
"""Get logger for domain."""
return DomainLogger(domain)

Structured Logging

Passing Context

Always pass structured context using extra or kwargs:

# ✅ Good: Structured context
logger.info(
"Contribution validated",
extra={
"contribution_id": contrib_id,
"status": "valid",
"rules_checked": 5,
"duration_ms": 234
}
)

# ❌ Avoid: String formatting
logger.info(f"Contribution {contrib_id} validated") # Non-parseable

Log Format

Structured JSON output (for parsing):

{
"timestamp": "2026-02-22T14:30:45.123Z",
"domain": "agents",
"level": "INFO",
"message": "Contribution validated",
"context": {
"contribution_id": "abc-123",
"status": "valid",
"rules_checked": 5,
"duration_ms": 234
}
}

Correlation IDs

Track requests across services:

import uuid
from contextvars import ContextVar

# Context variable (thread-safe, async-safe)
correlation_id: ContextVar[str] = ContextVar(
"correlation_id",
default=None
)

def set_correlation_id(cid: str):
"""Set correlation ID for this context."""
correlation_id.set(cid)

def log_with_correlation(logger, level: str, message: str, **kwargs):
"""Log with automatic correlation ID."""
kwargs["correlation_id"] = correlation_id.get() or str(uuid.uuid4())
getattr(logger, level)(message, extra=kwargs)

# Usage
@app.get("/api/contribute")
async def contribute(contribution: str):
cid = str(uuid.uuid4())
set_correlation_id(cid)

logger.info("Request received", extra={"correlation_id": cid})

# Business logic
result = await forseti.execute_feature("validate", contribution=contribution)

logger.info("Validation complete", extra={"correlation_id": cid, "result": result})
return result

APScheduler Integration

Task-Level Logging

Each scheduled task should log:

  1. Start: Task name, parameters
  2. Progress: Milestones, status updates
  3. Completion: Duration, result summary
  4. Errors: Full exception trace

Pattern:

from app.providers.logging import get_logger
from datetime import datetime

logger = get_logger("scheduler")

async def task_example(task_name: str, date_string: str):
"""Example scheduled task with logging."""
start_time = datetime.now()

try:
logger.info(
"Task started",
task_id=task_name,
date=date_string,
timestamp=start_time.isoformat()
)

# Step 1
logger.debug("Processing step 1")
result1 = await step1()

# Step 2
logger.debug("Processing step 2")
result2 = await step2(result1)

# Success
duration_ms = (datetime.now() - start_time).total_seconds() * 1000
logger.info(
"Task completed",
task_id=task_name,
status="success",
duration_ms=duration_ms,
result_count=len(result2)
)

return {"status": "success", "result": result2}

except Exception as e:
duration_ms = (datetime.now() - start_time).total_seconds() * 1000
logger.error(
"Task failed",
task_id=task_name,
status="failed",
duration_ms=duration_ms,
error=str(e),
exc_info=True # Full traceback
)
raise

Scheduler Lifecycle Logging

async def start_scheduler():
"""Scheduler startup logging."""
logger = get_logger("scheduler")

logger.info("Scheduler initialization")

scheduler = AsyncIOScheduler()

# Register jobs
logger.debug("Registering jobs")
scheduler.add_job(
task_audierne_docs,
CronTrigger(minute=20, hour="*/2"),
id="task_audierne_docs"
)
logger.debug("Job registered", job_id="task_audierne_docs")

scheduler.start()
logger.info("Scheduler started")

return scheduler

async def stop_scheduler(scheduler):
"""Scheduler shutdown logging."""
logger = get_logger("scheduler")

logger.info("Scheduler shutdown requested")

# Get running jobs
running = scheduler.get_jobs()
logger.debug(f"Waiting for {len(running)} jobs to complete")

scheduler.shutdown(wait=True)
logger.info("Scheduler stopped")

Log Rotation

Configuration

from logging.handlers import RotatingFileHandler

handler = RotatingFileHandler(
"logs/agents.log",
maxBytes=10 * 1024 * 1024, # 10 MB per file
backupCount=5 # Keep 5 backup files
)

# Results in: agents.log, agents.log.1, agents.log.2, ...

Log Directory Structure

logs/
├── agents.log # AI agents
├── agents.log.1 # Backup from yesterday
├── services.log # Business logic
├── scheduler.log # Task orchestration
├── providers.log # LLM API calls
├── mockup.log # Testing framework
└── errors.log # All errors (symlink or aggregated)

Cleanup Strategy

# Delete logs older than 30 days
find logs/ -name "*.log.*" -mtime +30 -delete

# Compress old logs
find logs/ -name "*.log.[0-9]" -exec gzip {} \;

# Add to cron: runs daily at 2 AM
0 2 * * * find /app/logs -name "*.log.*" -mtime +30 -delete

Observability Stack

Integration with Opik

from app.agents.tracing.opik import OpikTracer

logger = get_logger("agents")

@OpikTracer.track
async def validate_contribution(text: str):
"""Traced and logged."""
contribution_id = uuid.uuid4()

logger.info(
"Validation started",
contribution_id=contribution_id,
text_length=len(text)
)

result = await forseti.execute_feature("validate", contribution=text)

logger.info(
"Validation complete",
contribution_id=contribution_id,
valid=result["valid"]
)

# Opik traces this call automatically
return result

Log Aggregation

For production, forward logs to centralized service:

# Example: CloudWatch integration
import watchtower

cloudwatch_handler = watchtower.CloudWatchLogHandler(
log_group="ocapistaine",
stream_name="api-server"
)

logger.addHandler(cloudwatch_handler)

Examples

Example 1: Agent Logging

# app/agents/forseti/agent.py
from app.providers.logging import get_logger

logger = get_logger("agents")

class Forseti(BaseAgent):
async def validate(self, contribution: str) -> dict:
logger.info(
"Validation started",
contribution_length=len(contribution)
)

try:
result = await self.execute_feature(
"validate",
contribution=contribution
)

logger.info(
"Validation complete",
valid=result["valid"],
duration_ms=(time.time() - start) * 1000
)

return result

except Exception as e:
logger.error(
"Validation failed",
error=str(e),
exc_info=True
)
raise

Example 2: Task Logging

# app/services/tasks/task_audierne_docs.py
from app.providers.logging import get_logger

logger = get_logger("scheduler")

async def task_audierne_docs(date_string: str = None) -> dict:
"""Process audierne documents."""
start = datetime.now()

logger.info(
"Task started",
task="task_audierne_docs",
date=date_string
)

try:
# Get unprocessed documents
docs = get_unprocessed_docs()
logger.debug(f"Found {len(docs)} unprocessed documents")

for doc in docs:
logger.debug(f"Processing: {doc['file']}")

# Extract themes
themes = await extract_themes(doc['content'])
logger.debug(
f"Extracted {len(themes)} themes",
file=doc['file']
)

# Create dataset
dataset_id = await create_opik_dataset(doc, themes)
logger.info(
f"Dataset created",
file=doc['file'],
dataset_id=dataset_id
)

duration_ms = (datetime.now() - start).total_seconds() * 1000
logger.info(
"Task complete",
status="success",
documents_processed=len(docs),
duration_ms=duration_ms
)

except Exception as e:
logger.error(
"Task failed",
error=str(e),
exc_info=True
)
raise

Example 3: API Endpoint Logging

# app/main.py
from app.providers.logging import get_logger

logger = get_logger("presentation")

@app.post("/api/v1/contributions")
async def create_contribution(request: Request):
"""API endpoint with logging."""
contribution_id = str(uuid.uuid4())

logger.info(
"Request received",
endpoint="/api/v1/contributions",
contribution_id=contribution_id,
user_ip=request.client.host
)

try:
body = await request.json()
text = body.get("text", "")

logger.debug(
"Validating contribution",
contribution_id=contribution_id,
text_length=len(text)
)

# Validate
result = await forseti.validate(text)

logger.info(
"Validation complete",
contribution_id=contribution_id,
valid=result["valid"]
)

return {
"id": contribution_id,
"valid": result["valid"],
"reason": result.get("reason")
}

except Exception as e:
logger.error(
"Request failed",
contribution_id=contribution_id,
error=str(e),
exc_info=True
)
raise

Best Practices

✅ DO

  • Use domain loggers: One logger per subsystem
  • Structured context: Always pass structured kwargs
  • Correlation IDs: Track requests across services
  • Log levels appropriately: DEBUG, INFO, WARNING, ERROR, CRITICAL
  • Log on boundaries: Entry/exit of major operations
  • Include duration: Time-consuming operations
  • Full tracebacks: Use exc_info=True on errors
  • Rotate logs: Prevent disk fill-up

❌ DON'T

  • Log passwords/secrets: Never log sensitive data
  • Log in tight loops: Performance impact
  • String formatting in logs: Use structured context
  • Bare except: Log the exception type
  • Incomplete context: Include identifiers (IDs, names)
  • Fire-and-forget: Ensure log handlers complete

Testing

Mock Logger

import logging
from unittest.mock import MagicMock

def test_logging():
# Create mock logger
logger = MagicMock()

# Test code that logs
logger.info("test", extra={"key": "value"})

# Assert logging called
logger.info.assert_called_with("test", extra={"key": "value"})

Capture Logs

import pytest
from logging import DEBUG

def test_with_logs(caplog):
caplog.set_level(DEBUG)

# Code that logs
logger.info("test message")

# Assert
assert "test message" in caplog.text
assert caplog.records[0].levelname == "INFO"

References


Last Updated: 2026-02-22 Branch: valkyria Reference Implementation: OCapistaine