Aller au contenu principal

Agents Framework

Composable AI agent architecture for production applications

Reference implementations: OCapistaine (Forseti), Vaettir (MCP agents), custom agents


Table of Contents

  1. Core Concept
  2. Architecture
  3. BaseAgent Class
  4. AgentFeature Protocol
  5. Provider Injection
  6. Building Agents
  7. Feature Composition
  8. Testing
  9. Examples
  10. Design Principles (incl. Provider-Aware Chunking)

Core Concept

The Three-Layer Pattern

┌─────────────────────────────────────────────────────────┐
│ BaseAgent │
│ (WHO the agent is: persona, identity, system prompt) │
├─────────────────────────────────────────────────────────┤
│ Features │
│ (WHAT the agent does: composable functionality units) │
├─────────────────────────────────────────────────────────┤
│ Provider │
│ (HOW to generate responses: LLM API with failover) │
└─────────────────────────────────────────────────────────┘

Why This Pattern?

BenefitHow
ModularityFeatures are independent units, register/unregister at runtime
TestabilityMock providers, test features in isolation
ReusabilitySame feature used in multiple agents
FlexibilitySwap providers (Ollama → Claude → Gemini) without code change
ObservabilityFeatures inherit tracing, logging, metrics
ScalabilityAdd new features without modifying base agent

Architecture

Component Diagram

Agent Request


┌──────────────────────────────────┐
│ BaseAgent │
│ ┌────────────────────────────┐ │
│ │ persona_prompt: str │ │ WHO: System prompt defining agent identity
│ │ provider: LLMProvider │ │ PROVIDER: LLM API + failover chain
│ │ features: dict[Feature] │ │ FEATURES: Composable functionality
│ └────────────────────────────┘ │
└──────────────────────────────────┘

├─→ execute_feature(name, **kwargs)
│ │
│ ▼
│ ┌─────────────────────────┐
│ │ AgentFeature │
│ ├─────────────────────────┤
│ │ • name: str │
│ │ • prompt: str │
│ │ • execute(...) │ WHAT: Specific task (validate, classify, etc.)
│ └─────────────────────────┘
│ │
│ ├─→ With system_prompt
│ ├─→ With provider
│ └─→ With **kwargs (feature-specific args)
│ │
│ ▼
│ ┌─────────────────────────┐
│ │ LLMProvider │
│ ├─────────────────────────┤
│ │ Provider.complete() │ HOW: Call LLM API
│ │ + Failover chain │ - Ollama (local)
│ │ + Rate limiting │ - OpenAI (gpt-4o-mini)
│ │ + Tracing │ - Claude (anthropic)
│ │ + Retry logic │ - Mistral
│ │ │ - Gemini
│ └─────────────────────────┘
│ │
│ ▼
│ ┌─────────────────────────┐
│ │ LLM Response │
│ │ {content, tokens, ...} │
│ └─────────────────────────┘

└─→ Feature Result
(feature-specific output format)

BaseAgent Class

Definition

Location: app/agents/base.py

from abc import ABC, abstractmethod
from typing import Any, Protocol, runtime_checkable
from app.providers import LLMProvider, Message

@runtime_checkable
class AgentFeature(Protocol):
"""Protocol defining agent feature interface."""

@property
def name(self) -> str:
"""Unique feature identifier."""
...

@property
def prompt(self) -> str:
"""Feature prompt template."""
...

async def execute(
self,
provider: LLMProvider,
system_prompt: str,
**kwargs,
) -> Any:
"""Execute the feature."""
...


class BaseAgent(ABC):
"""Base class for all agents with feature composition."""

def __init__(
self,
provider: LLMProvider | None = None,
provider_name: str | None = None,
):
"""
Initialize agent with provider.

Args:
provider: Optional LLMProvider instance
provider_name: Optional provider name (uses default if not specified)
"""
self._provider = provider or get_provider(provider_name)
self._features: dict[str, AgentFeature] = {}

@property
@abstractmethod
def persona_prompt(self) -> str:
"""Agent's identity/system prompt."""
...

def register_feature(self, feature: AgentFeature) -> None:
"""Register a feature."""
if feature.name in self._features:
raise ValueError(f"Feature '{feature.name}' already registered")
self._features[feature.name] = feature

async def execute_feature(
self,
feature_name: str,
**kwargs,
) -> Any:
"""Execute a specific feature."""
if feature_name not in self._features:
raise KeyError(f"Feature '{feature_name}' not registered")

feature = self._features[feature_name]
return await feature.execute(
provider=self._provider,
system_prompt=self.persona_prompt,
**kwargs,
)

Key Methods

MethodPurpose
persona_prompt (property)System prompt defining WHO the agent is
register_feature(feature)Add functionality to agent at runtime
unregister_feature(name)Remove feature from agent
has_feature(name)Check if feature exists
execute_feature(name, **kwargs)Run specific feature
execute_all(**kwargs)Run all registered features
complete(user_message, ...)Simple completion using agent persona

AgentFeature Protocol

Definition

@runtime_checkable
class AgentFeature(Protocol):
"""Defines what makes a valid agent feature."""

@property
def name(self) -> str:
"""Unique identifier."""
...

@property
def prompt(self) -> str:
"""Feature prompt template (can use placeholders)."""
...

async def execute(
self,
provider: LLMProvider,
system_prompt: str,
**kwargs,
) -> Any:
"""
Execute the feature.

Args:
provider: LLM provider to use
system_prompt: Agent's system prompt (to combine with feature prompt)
**kwargs: Feature-specific arguments

Returns:
Feature-specific result (dict, str, object, etc.)
"""
...

Feature Implementation Pattern

from typing import Protocol, Any
from app.providers import LLMProvider, Message

class MyFeature:
"""Example feature implementation."""

@property
def name(self) -> str:
return "my_feature"

@property
def prompt(self) -> str:
return """
Analyze the following text and identify key themes.

Return a JSON object:
{
"themes": ["theme1", "theme2"],
"confidence": 0.95
}
"""

async def execute(
self,
provider: LLMProvider,
system_prompt: str,
text: str = "", # Feature-specific kwarg
**kwargs,
) -> dict[str, Any]:
"""Execute theme analysis."""
messages = [
Message(role="system", content=system_prompt),
Message(role="user", content=f"Text:\n{text}"),
]

response = await provider.complete(
messages=messages,
json_mode=True, # Request structured output
)

import json
return json.loads(response.content)

Provider Injection

LLMProvider Interface

class LLMProvider(Protocol):
"""LLM provider interface."""

async def complete(
self,
messages: list[Message],
temperature: float = 0.7,
json_mode: bool = False,
timeout: int = 30,
) -> LLMResponse:
"""Generate completion from messages."""
...

Failover Chain

OCapistaine implements automatic failover:

from app.providers.failover import ProviderWithFailover

# Primary: Ollama (local, free)
# Fallback: OpenAI, Claude, Mistral, Gemini
provider = ProviderWithFailover(
primary="ollama",
fallover_chain=["openai", "claude", "mistral", "gemini"],
)

# Or specify priority order
provider = ProviderWithFailover(
primary="claude", # Try Claude first
fallover_chain=["gemini", "openai"], # Then these
)

Provider Configuration

from app.providers import get_provider

# Get default provider (configured in env)
provider = get_provider()

# Get specific provider
provider = get_provider("claude")
provider = get_provider("gemini")
provider = get_provider("ollama")

# Inject into agent
agent = Forseti(provider=provider)

Building Agents

Step 1: Define Persona

from app.agents.base import BaseAgent

class MyAgent(BaseAgent):
@property
def persona_prompt(self) -> str:
return """
You are an expert analyst for civic participation.
Your role is to understand citizen contributions and classify them.

Be precise, fair, and transparent in your analysis.
Always cite the rules you're applying.
"""

Step 2: Create Features

class ValidationFeature:
@property
def name(self) -> str:
return "validate"

@property
def prompt(self) -> str:
return """
Check if this contribution complies with the charter.

Return:
{
"valid": boolean,
"reason": string,
"violations": [list of violated rules]
}
"""

async def execute(
self,
provider: LLMProvider,
system_prompt: str,
contribution: str = "",
**kwargs,
) -> dict:
messages = [
Message(role="system", content=system_prompt),
Message(role="user", content=f"Contribution:\n{contribution}"),
]
response = await provider.complete(messages=messages, json_mode=True)
return json.loads(response.content)


class ClassificationFeature:
@property
def name(self) -> str:
return "classify"

@property
def prompt(self) -> str:
return """
Classify this contribution into one of: environment, transport, social, culture, other.

Return:
{
"category": string,
"confidence": float
}
"""

async def execute(
self,
provider: LLMProvider,
system_prompt: str,
contribution: str = "",
**kwargs,
) -> dict:
# Similar implementation...
pass

Step 3: Register Features

class MyAgent(BaseAgent):
def __init__(self, provider: LLMProvider | None = None):
super().__init__(provider=provider)

# Register features
self.register_feature(ValidationFeature())
self.register_feature(ClassificationFeature())

@property
def persona_prompt(self) -> str:
return "..."

Step 4: Use the Agent

# Initialize agent
agent = MyAgent(provider=get_provider("claude"))

# Execute single feature
validation_result = await agent.execute_feature(
"validate",
contribution="Je propose d'améliorer l'éclairage du port"
)

# Execute all features
results = await agent.execute_all(
contribution="Je propose d'améliorer l'éclairage du port"
)
# results = {
# "validate": {"valid": True, "reason": "..."},
# "classify": {"category": "environment", "confidence": 0.95}
# }

Feature Composition

Multiple Agents Sharing Features

Features can be reused across agents:

# Feature definition (reusable)
class TokenCounterFeature:
@property
def name(self) -> str:
return "count_tokens"

async def execute(self, provider, system_prompt, text="", **kwargs):
return {"tokens": len(text.split())}

# Agent 1: Forseti (charter validation)
class Forseti(BaseAgent):
def __init__(self):
super().__init__()
self.register_feature(ValidationFeature())
self.register_feature(ClassificationFeature())
self.register_feature(TokenCounterFeature()) # Shared

# Agent 2: RAG Agent (document analysis)
class RAGAgent(BaseAgent):
def __init__(self):
super().__init__()
self.register_feature(RetrievalFeature())
self.register_feature(TokenCounterFeature()) # Shared

Nested Composition

class ComposedAgent(BaseAgent):
def __init__(self):
super().__init__()
self.forseti = Forseti() # Nested agent
self.rag = RAGAgent() # Another nested agent

async def orchestrate(self, contribution: str):
"""Run multiple agents sequentially."""
# 1. Validate with Forseti
validation = await self.forseti.execute_feature(
"validate",
contribution=contribution
)

if not validation["valid"]:
return {"status": "rejected", "reason": validation["reason"]}

# 2. Retrieve context with RAG
context = await self.rag.execute_feature(
"retrieve",
query=contribution
)

return {
"status": "accepted",
"context": context,
"validation": validation
}

Testing

Mock Provider

from app.providers import LLMProvider, LLMResponse, Message

class MockProvider(LLMProvider):
"""Mock provider for testing."""

def __init__(self, response_map: dict[str, str] = None):
self.response_map = response_map or {}

async def complete(
self,
messages: list[Message],
**kwargs,
) -> LLMResponse:
"""Return predefined response based on user message."""
user_message = next(
(m.content for m in messages if m.role == "user"),
"unknown"
)

response = self.response_map.get(
user_message,
'{"result": "default"}'
)

return LLMResponse(
content=response,
tokens=len(response.split())
)

# Usage
mock_provider = MockProvider({
"Contribution:\nTest contribution": '{"valid": true, "reason": "OK"}'
})

agent = MyAgent(provider=mock_provider)
result = await agent.execute_feature("validate", contribution="Test contribution")
assert result["valid"] == True

Feature Unit Tests

import pytest
from app.agents.base import BaseAgent

@pytest.mark.asyncio
async def test_validation_feature():
"""Test validation feature in isolation."""
mock_provider = MockProvider({
"Contribution:\nInvalid": '{"valid": false, "reason": "Violates rule 5"}'
})

agent = MyAgent(provider=mock_provider)
result = await agent.execute_feature(
"validate",
contribution="Invalid"
)

assert result["valid"] == False
assert "rule 5" in result["reason"]

@pytest.mark.asyncio
async def test_agent_feature_registration():
"""Test that features are registered correctly."""
agent = MyAgent()

assert agent.has_feature("validate")
assert agent.has_feature("classify")
assert not agent.has_feature("unknown")

@pytest.mark.asyncio
async def test_provider_injection():
"""Test provider can be injected."""
custom_provider = MockProvider()
agent = MyAgent(provider=custom_provider)

assert agent.provider == custom_provider

Examples

Example 1: Forseti Agent (Charter Validation)

Location: app/agents/forseti/agent.py

from app.agents.base import BaseAgent
from app.agents.forseti.features import (
ComplianceFeature,
ClassificationFeature,
AnonymizationFeature,
)

class Forseti(BaseAgent):
"""Charter compliance validation agent."""

def __init__(self, provider: LLMProvider | None = None):
super().__init__(provider=provider)

# Register features
self.register_feature(ComplianceFeature())
self.register_feature(ClassificationFeature())
self.register_feature(AnonymizationFeature())

@property
def persona_prompt(self) -> str:
return """
You are Forseti, the civic participation charter guardian.

Your role is to:
1. Validate contributions against the participation charter
2. Classify contributions by theme (environment, transport, etc.)
3. Anonymize sensitive information when needed

Be fair, transparent, and cite the charter rules you apply.
"""

# Usage
forseti = Forseti(provider=get_provider("claude"))

# Validate a contribution
result = await forseti.execute_feature(
"compliance",
contribution="Je propose l'amélioration de l'éclairage du port"
)
print(result)
# {
# "compliant": True,
# "issues": [],
# "charter_rules_checked": ["rule_1", "rule_2", "rule_3"]
# }

Example 2: Custom RAG Agent

from app.agents.base import BaseAgent

class RAGAgent(BaseAgent):
"""Retrieval-Augmented Generation agent."""

def __init__(self, vector_store, provider: LLMProvider | None = None):
super().__init__(provider=provider)
self.vector_store = vector_store

self.register_feature(RetrievalFeature(vector_store))
self.register_feature(GenerationFeature())

@property
def persona_prompt(self) -> str:
return """
You are a knowledgeable assistant for municipal governance.
Use provided documents to answer questions accurately.
Cite your sources.
"""

# Usage
rag = RAGAgent(
vector_store=vector_db,
provider=get_provider("ollama") # Local LLM
)

answer = await rag.execute_feature(
"generate",
query="What are the current traffic policies?",
documents=[...] # Retrieved context
)

Design Principles

1. Separation of Concerns

  • BaseAgent: WHO (identity, persona)
  • Features: WHAT (specific tasks)
  • Provider: HOW (LLM API implementation)
# ✅ Good: Each layer has single responsibility
agent = Forseti(provider=get_provider("claude"))

# ❌ Avoid: Mixed responsibilities
class BadAgent:
async def validate_and_call_api(self): # Multiple concerns
...

2. Dependency Injection

  • Agent takes provider as dependency
  • Features receive provider at execute time
  • Enables testing with mock providers
# ✅ Good: Injected dependency
agent = Forseti(provider=mock_provider)

# ❌ Avoid: Creating dependency internally
class BadAgent:
def __init__(self):
self.provider = get_provider() # Hard to test

3. Fail Safe with Fallbacks

  • Primary provider preference specified
  • Automatic fallback to alternative providers
  • Always has a working provider (even if degraded)
# ✅ Good: With failover
provider = ProviderWithFailover(
primary="ollama",
fallover_chain=["gemini", "openai"]
)

# ❌ Avoid: Single provider
provider = OllamaProvider() # Fails if Ollama down

4. Observable by Default

  • Features work with Opik tracing
  • Structured logging inherited from BaseAgent
  • Metrics collected automatically
# ✅ Good: Traced and observable
@OpikTracer.track
async def process(contribution: str):
result = await forseti.execute_feature("validate", contribution=contribution)
logger.info("Validation complete", extra={"result": result})
return result

5. Provider-Aware Chunking

Input size is a cross-cutting concern between Features (WHAT) and Provider (HOW). A feature that works on Claude's 200k context will timeout on a local 7B model. Chunking strategy must adapt to the provider's capabilities.

The Problem

Provider Constraint          Impact on Feature
───────────────────── ─────────────────────
Context window (tokens) → Max input per call
Inference speed (tok/s) → Timeout budget
JSON mode overhead → Structured output adds latency
Model size (7B vs 70B) → Quality vs speed tradeoff

A 58k character document:

  • Claude/Gemini: 1 chunk, completes in seconds
  • Ollama deepseek-r1:7b: 8 chunks of 8k, 2 min each = 16 min total
  • Ollama deepseek-r1:7b at 15k chunks: timeouts on every chunk (120s budget exhausted)

Chunk Size Guidelines

Provider ClassRecommended Chunk SizeTimeoutNotes
Cloud APIs (Claude, Gemini, OpenAI)15-30k chars30-60sLarge context, fast inference
Local large models (Ollama 70B+)10-15k chars180sGood context, slower inference
Local small models (Ollama 7B-14B)6-8k chars120sLimited context, JSON mode adds overhead
Local tiny models (Ollama <7B)3-5k chars120sMinimal context, quality degrades on long input

Implementation Pattern

Features that process documents should accept chunk configuration rather than hardcoding it:

# In feature or processor
CHUNK_SIZE = 8000 # Safe default for smallest supported provider
CHUNK_OVERLAP = 500 # Paragraph boundary overlap

chunks = split_into_chunks(text, CHUNK_SIZE, CHUNK_OVERLAP)

for i, chunk in enumerate(chunks):
result = await provider.complete(messages, json_mode=True)

Key rules:

  • Default to the smallest provider you support (currently 7B local models = 8k chunks)
  • Overlap at paragraph boundaries to avoid cutting mid-sentence
  • Log chunk count and size so timeouts are diagnosable
  • Empty str(exception) trap: httpx.ReadTimeout stringifies to "" — always use str(e) or type(e).__name__
  • Each chunk = one LLM call: if a chunk times out, the feature should log and continue with remaining chunks rather than failing entirely

Chunking and the ABC Layers

BaseAgent (WHO)

├── Feature (WHAT)
│ └── Chunking lives HERE
│ - Split input into provider-safe sizes
│ - Deduplicate results across chunks
│ - Aggregate partial results

└── Provider (HOW)
└── Constraints come from HERE
- Context window, timeout, speed
- Provider.complete() is chunk-unaware

The Provider layer stays chunk-unaware — it processes single requests. The Feature layer owns chunking because it understands how to split and reassemble results for its specific task (theme extraction deduplicates, anonymization applies mappings globally, validation checks each chunk independently).


References

  • OCapistaine Forseti: app/agents/forseti/
  • Provider Interface: app/providers/base.py
  • Opik Tracing: app/agents/tracing/opik.py

Last Updated: 2026-03-04 Branch: dev Reference Implementation: OCapistaine Forseti Agent