Agents Framework
Composable AI agent architecture for production applications
Reference implementations: OCapistaine (Forseti), Vaettir (MCP agents), custom agents
Table of Contents
- Core Concept
- Architecture
- BaseAgent Class
- AgentFeature Protocol
- Provider Injection
- Building Agents
- Feature Composition
- Testing
- Examples
- Design Principles (incl. Provider-Aware Chunking)
Core Concept
The Three-Layer Pattern
┌─────────────────────────────────────────────────────────┐
│ BaseAgent │
│ (WHO the agent is: persona, identity, system prompt) │
├─────────────────────────────────────────────────────────┤
│ Features │
│ (WHAT the agent does: composable functionality units) │
├─────────────────────────────────────────────────────────┤
│ Provider │
│ (HOW to generate responses: LLM API with failover) │
└─────────────────────────────────────────────────────────┘
Why This Pattern?
| Benefit | How |
|---|---|
| Modularity | Features are independent units, register/unregister at runtime |
| Testability | Mock providers, test features in isolation |
| Reusability | Same feature used in multiple agents |
| Flexibility | Swap providers (Ollama → Claude → Gemini) without code change |
| Observability | Features inherit tracing, logging, metrics |
| Scalability | Add new features without modifying base agent |
Architecture
Component Diagram
Agent Request
│
▼
┌──────────────────────────────────┐
│ BaseAgent │
│ ┌────────────────────────────┐ │
│ │ persona_prompt: str │ │ WHO: System prompt defining agent identity
│ │ provider: LLMProvider │ │ PROVIDER: LLM API + failover chain
│ │ features: dict[Feature] │ │ FEATURES: Composable functionality
│ └────────────────────────────┘ │
└──────────────────────────────────┘
│
├─→ execute_feature(name, **kwargs)
│ │
│ ▼
│ ┌─────────────────────────┐
│ │ AgentFeature │
│ ├─────────────────────────┤
│ │ • name: str │
│ │ • prompt: str │
│ │ • execute(...) │ WHAT: Specific task (validate, classify, etc.)
│ └─────────────────────────┘
│ │
│ ├─→ With system_prompt
│ ├─→ With provider
│ └─→ With **kwargs (feature-specific args)
│ │
│ ▼
│ ┌─────────────────────────┐
│ │ LLMProvider │
│ ├─────────────────────────┤
│ │ Provider.complete() │ HOW: Call LLM API
│ │ + Failover chain │ - Ollama (local)
│ │ + Rate limiting │ - OpenAI (gpt-4o-mini)
│ │ + Tracing │ - Claude (anthropic)
│ │ + Retry logic │ - Mistral
│ │ │ - Gemini
│ └─────────────────────────┘
│ │
│ ▼
│ ┌─────────────────────────┐
│ │ LLM Response │
│ │ {content, tokens, ...} │
│ └─────────────────────────┘
│
└─→ Feature Result
(feature-specific output format)
BaseAgent Class
Definition
Location: app/agents/base.py
from abc import ABC, abstractmethod
from typing import Any, Protocol, runtime_checkable
from app.providers import LLMProvider, Message
@runtime_checkable
class AgentFeature(Protocol):
"""Protocol defining agent feature interface."""
@property
def name(self) -> str:
"""Unique feature identifier."""
...
@property
def prompt(self) -> str:
"""Feature prompt template."""
...
async def execute(
self,
provider: LLMProvider,
system_prompt: str,
**kwargs,
) -> Any:
"""Execute the feature."""
...
class BaseAgent(ABC):
"""Base class for all agents with feature composition."""
def __init__(
self,
provider: LLMProvider | None = None,
provider_name: str | None = None,
):
"""
Initialize agent with provider.
Args:
provider: Optional LLMProvider instance
provider_name: Optional provider name (uses default if not specified)
"""
self._provider = provider or get_provider(provider_name)
self._features: dict[str, AgentFeature] = {}
@property
@abstractmethod
def persona_prompt(self) -> str:
"""Agent's identity/system prompt."""
...
def register_feature(self, feature: AgentFeature) -> None:
"""Register a feature."""
if feature.name in self._features:
raise ValueError(f"Feature '{feature.name}' already registered")
self._features[feature.name] = feature
async def execute_feature(
self,
feature_name: str,
**kwargs,
) -> Any:
"""Execute a specific feature."""
if feature_name not in self._features:
raise KeyError(f"Feature '{feature_name}' not registered")
feature = self._features[feature_name]
return await feature.execute(
provider=self._provider,
system_prompt=self.persona_prompt,
**kwargs,
)
Key Methods
| Method | Purpose |
|---|---|
persona_prompt (property) | System prompt defining WHO the agent is |
register_feature(feature) | Add functionality to agent at runtime |
unregister_feature(name) | Remove feature from agent |
has_feature(name) | Check if feature exists |
execute_feature(name, **kwargs) | Run specific feature |
execute_all(**kwargs) | Run all registered features |
complete(user_message, ...) | Simple completion using agent persona |
AgentFeature Protocol
Definition
@runtime_checkable
class AgentFeature(Protocol):
"""Defines what makes a valid agent feature."""
@property
def name(self) -> str:
"""Unique identifier."""
...
@property
def prompt(self) -> str:
"""Feature prompt template (can use placeholders)."""
...
async def execute(
self,
provider: LLMProvider,
system_prompt: str,
**kwargs,
) -> Any:
"""
Execute the feature.
Args:
provider: LLM provider to use
system_prompt: Agent's system prompt (to combine with feature prompt)
**kwargs: Feature-specific arguments
Returns:
Feature-specific result (dict, str, object, etc.)
"""
...
Feature Implementation Pattern
from typing import Protocol, Any
from app.providers import LLMProvider, Message
class MyFeature:
"""Example feature implementation."""
@property
def name(self) -> str:
return "my_feature"
@property
def prompt(self) -> str:
return """
Analyze the following text and identify key themes.
Return a JSON object:
{
"themes": ["theme1", "theme2"],
"confidence": 0.95
}
"""
async def execute(
self,
provider: LLMProvider,
system_prompt: str,
text: str = "", # Feature-specific kwarg
**kwargs,
) -> dict[str, Any]:
"""Execute theme analysis."""
messages = [
Message(role="system", content=system_prompt),
Message(role="user", content=f"Text:\n{text}"),
]
response = await provider.complete(
messages=messages,
json_mode=True, # Request structured output
)
import json
return json.loads(response.content)
Provider Injection
LLMProvider Interface
class LLMProvider(Protocol):
"""LLM provider interface."""
async def complete(
self,
messages: list[Message],
temperature: float = 0.7,
json_mode: bool = False,
timeout: int = 30,
) -> LLMResponse:
"""Generate completion from messages."""
...
Failover Chain
OCapistaine implements automatic failover:
from app.providers.failover import ProviderWithFailover
# Primary: Ollama (local, free)
# Fallback: OpenAI, Claude, Mistral, Gemini
provider = ProviderWithFailover(
primary="ollama",
fallover_chain=["openai", "claude", "mistral", "gemini"],
)
# Or specify priority order
provider = ProviderWithFailover(
primary="claude", # Try Claude first
fallover_chain=["gemini", "openai"], # Then these
)
Provider Configuration
from app.providers import get_provider
# Get default provider (configured in env)
provider = get_provider()
# Get specific provider
provider = get_provider("claude")
provider = get_provider("gemini")
provider = get_provider("ollama")
# Inject into agent
agent = Forseti(provider=provider)
Building Agents
Step 1: Define Persona
from app.agents.base import BaseAgent
class MyAgent(BaseAgent):
@property
def persona_prompt(self) -> str:
return """
You are an expert analyst for civic participation.
Your role is to understand citizen contributions and classify them.
Be precise, fair, and transparent in your analysis.
Always cite the rules you're applying.
"""
Step 2: Create Features
class ValidationFeature:
@property
def name(self) -> str:
return "validate"
@property
def prompt(self) -> str:
return """
Check if this contribution complies with the charter.
Return:
{
"valid": boolean,
"reason": string,
"violations": [list of violated rules]
}
"""
async def execute(
self,
provider: LLMProvider,
system_prompt: str,
contribution: str = "",
**kwargs,
) -> dict:
messages = [
Message(role="system", content=system_prompt),
Message(role="user", content=f"Contribution:\n{contribution}"),
]
response = await provider.complete(messages=messages, json_mode=True)
return json.loads(response.content)
class ClassificationFeature:
@property
def name(self) -> str:
return "classify"
@property
def prompt(self) -> str:
return """
Classify this contribution into one of: environment, transport, social, culture, other.
Return:
{
"category": string,
"confidence": float
}
"""
async def execute(
self,
provider: LLMProvider,
system_prompt: str,
contribution: str = "",
**kwargs,
) -> dict:
# Similar implementation...
pass
Step 3: Register Features
class MyAgent(BaseAgent):
def __init__(self, provider: LLMProvider | None = None):
super().__init__(provider=provider)
# Register features
self.register_feature(ValidationFeature())
self.register_feature(ClassificationFeature())
@property
def persona_prompt(self) -> str:
return "..."
Step 4: Use the Agent
# Initialize agent
agent = MyAgent(provider=get_provider("claude"))
# Execute single feature
validation_result = await agent.execute_feature(
"validate",
contribution="Je propose d'améliorer l'éclairage du port"
)
# Execute all features
results = await agent.execute_all(
contribution="Je propose d'améliorer l'éclairage du port"
)
# results = {
# "validate": {"valid": True, "reason": "..."},
# "classify": {"category": "environment", "confidence": 0.95}
# }
Feature Composition
Multiple Agents Sharing Features
Features can be reused across agents:
# Feature definition (reusable)
class TokenCounterFeature:
@property
def name(self) -> str:
return "count_tokens"
async def execute(self, provider, system_prompt, text="", **kwargs):
return {"tokens": len(text.split())}
# Agent 1: Forseti (charter validation)
class Forseti(BaseAgent):
def __init__(self):
super().__init__()
self.register_feature(ValidationFeature())
self.register_feature(ClassificationFeature())
self.register_feature(TokenCounterFeature()) # Shared
# Agent 2: RAG Agent (document analysis)
class RAGAgent(BaseAgent):
def __init__(self):
super().__init__()
self.register_feature(RetrievalFeature())
self.register_feature(TokenCounterFeature()) # Shared
Nested Composition
class ComposedAgent(BaseAgent):
def __init__(self):
super().__init__()
self.forseti = Forseti() # Nested agent
self.rag = RAGAgent() # Another nested agent
async def orchestrate(self, contribution: str):
"""Run multiple agents sequentially."""
# 1. Validate with Forseti
validation = await self.forseti.execute_feature(
"validate",
contribution=contribution
)
if not validation["valid"]:
return {"status": "rejected", "reason": validation["reason"]}
# 2. Retrieve context with RAG
context = await self.rag.execute_feature(
"retrieve",
query=contribution
)
return {
"status": "accepted",
"context": context,
"validation": validation
}
Testing
Mock Provider
from app.providers import LLMProvider, LLMResponse, Message
class MockProvider(LLMProvider):
"""Mock provider for testing."""
def __init__(self, response_map: dict[str, str] = None):
self.response_map = response_map or {}
async def complete(
self,
messages: list[Message],
**kwargs,
) -> LLMResponse:
"""Return predefined response based on user message."""
user_message = next(
(m.content for m in messages if m.role == "user"),
"unknown"
)
response = self.response_map.get(
user_message,
'{"result": "default"}'
)
return LLMResponse(
content=response,
tokens=len(response.split())
)
# Usage
mock_provider = MockProvider({
"Contribution:\nTest contribution": '{"valid": true, "reason": "OK"}'
})
agent = MyAgent(provider=mock_provider)
result = await agent.execute_feature("validate", contribution="Test contribution")
assert result["valid"] == True
Feature Unit Tests
import pytest
from app.agents.base import BaseAgent
@pytest.mark.asyncio
async def test_validation_feature():
"""Test validation feature in isolation."""
mock_provider = MockProvider({
"Contribution:\nInvalid": '{"valid": false, "reason": "Violates rule 5"}'
})
agent = MyAgent(provider=mock_provider)
result = await agent.execute_feature(
"validate",
contribution="Invalid"
)
assert result["valid"] == False
assert "rule 5" in result["reason"]
@pytest.mark.asyncio
async def test_agent_feature_registration():
"""Test that features are registered correctly."""
agent = MyAgent()
assert agent.has_feature("validate")
assert agent.has_feature("classify")
assert not agent.has_feature("unknown")
@pytest.mark.asyncio
async def test_provider_injection():
"""Test provider can be injected."""
custom_provider = MockProvider()
agent = MyAgent(provider=custom_provider)
assert agent.provider == custom_provider
Examples
Example 1: Forseti Agent (Charter Validation)
Location: app/agents/forseti/agent.py
from app.agents.base import BaseAgent
from app.agents.forseti.features import (
ComplianceFeature,
ClassificationFeature,
AnonymizationFeature,
)
class Forseti(BaseAgent):
"""Charter compliance validation agent."""
def __init__(self, provider: LLMProvider | None = None):
super().__init__(provider=provider)
# Register features
self.register_feature(ComplianceFeature())
self.register_feature(ClassificationFeature())
self.register_feature(AnonymizationFeature())
@property
def persona_prompt(self) -> str:
return """
You are Forseti, the civic participation charter guardian.
Your role is to:
1. Validate contributions against the participation charter
2. Classify contributions by theme (environment, transport, etc.)
3. Anonymize sensitive information when needed
Be fair, transparent, and cite the charter rules you apply.
"""
# Usage
forseti = Forseti(provider=get_provider("claude"))
# Validate a contribution
result = await forseti.execute_feature(
"compliance",
contribution="Je propose l'amélioration de l'éclairage du port"
)
print(result)
# {
# "compliant": True,
# "issues": [],
# "charter_rules_checked": ["rule_1", "rule_2", "rule_3"]
# }
Example 2: Custom RAG Agent
from app.agents.base import BaseAgent
class RAGAgent(BaseAgent):
"""Retrieval-Augmented Generation agent."""
def __init__(self, vector_store, provider: LLMProvider | None = None):
super().__init__(provider=provider)
self.vector_store = vector_store
self.register_feature(RetrievalFeature(vector_store))
self.register_feature(GenerationFeature())
@property
def persona_prompt(self) -> str:
return """
You are a knowledgeable assistant for municipal governance.
Use provided documents to answer questions accurately.
Cite your sources.
"""
# Usage
rag = RAGAgent(
vector_store=vector_db,
provider=get_provider("ollama") # Local LLM
)
answer = await rag.execute_feature(
"generate",
query="What are the current traffic policies?",
documents=[...] # Retrieved context
)
Design Principles
1. Separation of Concerns
- BaseAgent: WHO (identity, persona)
- Features: WHAT (specific tasks)
- Provider: HOW (LLM API implementation)
# ✅ Good: Each layer has single responsibility
agent = Forseti(provider=get_provider("claude"))
# ❌ Avoid: Mixed responsibilities
class BadAgent:
async def validate_and_call_api(self): # Multiple concerns
...
2. Dependency Injection
- Agent takes provider as dependency
- Features receive provider at execute time
- Enables testing with mock providers
# ✅ Good: Injected dependency
agent = Forseti(provider=mock_provider)
# ❌ Avoid: Creating dependency internally
class BadAgent:
def __init__(self):
self.provider = get_provider() # Hard to test
3. Fail Safe with Fallbacks
- Primary provider preference specified
- Automatic fallback to alternative providers
- Always has a working provider (even if degraded)
# ✅ Good: With failover
provider = ProviderWithFailover(
primary="ollama",
fallover_chain=["gemini", "openai"]
)
# ❌ Avoid: Single provider
provider = OllamaProvider() # Fails if Ollama down
4. Observable by Default
- Features work with Opik tracing
- Structured logging inherited from BaseAgent
- Metrics collected automatically
# ✅ Good: Traced and observable
@OpikTracer.track
async def process(contribution: str):
result = await forseti.execute_feature("validate", contribution=contribution)
logger.info("Validation complete", extra={"result": result})
return result
5. Provider-Aware Chunking
Input size is a cross-cutting concern between Features (WHAT) and Provider (HOW). A feature that works on Claude's 200k context will timeout on a local 7B model. Chunking strategy must adapt to the provider's capabilities.
The Problem
Provider Constraint Impact on Feature
───────────────────── ─────────────────────
Context window (tokens) → Max input per call
Inference speed (tok/s) → Timeout budget
JSON mode overhead → Structured output adds latency
Model size (7B vs 70B) → Quality vs speed tradeoff
A 58k character document:
- Claude/Gemini: 1 chunk, completes in seconds
- Ollama deepseek-r1:7b: 8 chunks of 8k, 2 min each = 16 min total
- Ollama deepseek-r1:7b at 15k chunks: timeouts on every chunk (120s budget exhausted)
Chunk Size Guidelines
| Provider Class | Recommended Chunk Size | Timeout | Notes |
|---|---|---|---|
| Cloud APIs (Claude, Gemini, OpenAI) | 15-30k chars | 30-60s | Large context, fast inference |
| Local large models (Ollama 70B+) | 10-15k chars | 180s | Good context, slower inference |
| Local small models (Ollama 7B-14B) | 6-8k chars | 120s | Limited context, JSON mode adds overhead |
| Local tiny models (Ollama <7B) | 3-5k chars | 120s | Minimal context, quality degrades on long input |
Implementation Pattern
Features that process documents should accept chunk configuration rather than hardcoding it:
# In feature or processor
CHUNK_SIZE = 8000 # Safe default for smallest supported provider
CHUNK_OVERLAP = 500 # Paragraph boundary overlap
chunks = split_into_chunks(text, CHUNK_SIZE, CHUNK_OVERLAP)
for i, chunk in enumerate(chunks):
result = await provider.complete(messages, json_mode=True)
Key rules:
- Default to the smallest provider you support (currently 7B local models = 8k chunks)
- Overlap at paragraph boundaries to avoid cutting mid-sentence
- Log chunk count and size so timeouts are diagnosable
- Empty
str(exception)trap:httpx.ReadTimeoutstringifies to""— always usestr(e) or type(e).__name__ - Each chunk = one LLM call: if a chunk times out, the feature should log and continue with remaining chunks rather than failing entirely
Chunking and the ABC Layers
BaseAgent (WHO)
│
├── Feature (WHAT)
│ └── Chunking lives HERE
│ - Split input into provider-safe sizes
│ - Deduplicate results across chunks
│ - Aggregate partial results
│
└── Provider (HOW)
└── Constraints come from HERE
- Context window, timeout, speed
- Provider.complete() is chunk-unaware
The Provider layer stays chunk-unaware — it processes single requests. The Feature layer owns chunking because it understands how to split and reassemble results for its specific task (theme extraction deduplicates, anonymization applies mappings globally, validation checks each chunk independently).
References
- OCapistaine Forseti:
app/agents/forseti/ - Provider Interface:
app/providers/base.py - Opik Tracing:
app/agents/tracing/opik.py
Last Updated: 2026-03-04 Branch: dev Reference Implementation: OCapistaine Forseti Agent