Process Orchestration
Coordinating multi-service and multi-step workflows in production systems
This guide documents how to design and manage complex workflows across multiple services, with a focus on async-first coordination, state management, and fault tolerance.
Overview
Process Orchestration is the art of coordinating multiple services and processing steps into a cohesive workflow. This is distinct from:
- Task Scheduling (APScheduler patterns) - When to run tasks
- Application Lifecycle (Startup/shutdown) - How services initialize
- Scripts Standardization (Script structure) - How to automate tasks
Instead, Process Orchestration focuses on: How do multiple services/steps work together?
Core Concepts
1. Workflows (Multi-Step Processes)
A workflow is a sequence of steps that process data through multiple stages:
Input → Step 1 → Step 2 → Step 3 → Output
↓ ↓ ↓
State State State
Example: Document Processing Workflow
Raw Doc → Chunking → Embedding → Storage → Dataset Creation
Each step can:
- Transform data
- Call external APIs
- Store intermediate results
- Fail and require retry
2. Composition Patterns
Workflows are composed by chaining processors:
workflow = Workflow()
workflow.add_step(ChunkingProcessor())
workflow.add_step(EmbeddingProcessor())
workflow.add_step(StorageProcessor())
result = await workflow.execute(raw_document)
3. State Management
Intermediate results must be stored and passed between steps:
# Each step receives context with previous results
class ProcessingContext:
input_data: Any
intermediate_results: dict[str, Any] # From previous steps
metadata: dict # Tracking info
4. Fault Tolerance
Workflows must handle failures gracefully:
# Retry policy
@retry(max_attempts=3, backoff=exponential)
async def step_with_retry(context: ProcessingContext):
...
# Idempotency (run safely multiple times)
if context.metadata.get("step_completed"):
return context.intermediate_results["step_result"]
Orchestration Patterns
Pattern 1: Sequential Orchestration (Local)
Use when: Steps are tightly coupled and run in same process
# OCapistaine example: Contribution analysis workflow
workflow = ContributionAnalysisWorkflow()
async def analyze_contribution(text: str):
context = ProcessingContext(input_data=text)
# Step 1: Validation
validated = await workflow.validate(context)
context.intermediate_results["validation"] = validated
# Step 2: Classification
classified = await workflow.classify(context)
context.intermediate_results["classification"] = classified
# Step 3: Evaluation
evaluated = await workflow.evaluate(context)
context.intermediate_results["evaluation"] = evaluated
return context
Pros:
- Simple, direct control
- Easy to debug
- No inter-service coordination
Cons:
- Can't scale steps independently
- Single failure blocks entire workflow
- Requires same runtime environment
Pattern 2: Event-Driven Orchestration (Async)
Use when: Steps can run independently with event notifications
# Services connected via message queue (Redis, Kafka)
# Service A: Document Chunker
@app.post("/process")
async def chunk_document(doc_id: str):
chunks = await chunk(doc_id)
# Publish event
await event_bus.publish("document_chunked", {
"doc_id": doc_id,
"chunk_count": len(chunks)
})
return {"status": "chunked"}
# Service B: Embedding Generator (listens for "document_chunked")
@event_bus.on("document_chunked")
async def embed_chunks(event: dict):
doc_id = event["doc_id"]
embeddings = await generate_embeddings(doc_id)
await event_bus.publish("chunks_embedded", {
"doc_id": doc_id,
"embedding_count": len(embeddings)
})
# Service C: Vector Store (listens for "chunks_embedded")
@event_bus.on("chunks_embedded")
async def store_vectors(event: dict):
doc_id = event["doc_id"]
await store_in_vector_db(doc_id)
Pros:
- Highly scalable (services run independently)
- Fault isolated (one failure doesn't block others)
- Services can be in different languages/frameworks
- Easy to add new services to pipeline
Cons:
- More complex debugging (distributed trace needed)
- Eventual consistency (temporary inconsistency possible)
- Harder to test (requires message setup)
Pattern 3: Workflow Orchestrator Service
Use when: You need both scalability and centralized control
# Dedicated orchestrator coordinates everything
class WorkflowOrchestrator:
async def execute_workflow(self, workflow_id: str):
# Get workflow definition
workflow = await load_workflow(workflow_id)
# Execute steps in order, handling failures
for step in workflow.steps:
try:
result = await self.call_service(
step.service_endpoint,
step.input_data
)
# Store result
await self.store_step_result(
workflow_id,
step.id,
result
)
except StepFailure as e:
# Handle failure
await self.handle_failure(workflow_id, step.id, e)
break
# Client submits workflow request
@app.post("/workflows")
async def submit_workflow(workflow: WorkflowDefinition):
workflow_id = await orchestrator.start_workflow(workflow)
return {"workflow_id": workflow_id}
# Query workflow status
@app.get("/workflows/{workflow_id}/status")
async def get_status(workflow_id: str):
status = await orchestrator.get_status(workflow_id)
return status
Pros:
- Centralized control and monitoring
- Easy to add retries, timeouts, error handling
- Good visibility (single dashboard for all workflows)
- Works with both local and remote services
Cons:
- Orchestrator is a bottleneck
- Single point of failure if not properly designed
- Requires dedicated service
OCapistaine Implementation
OCapistaine uses Sequential Orchestration for contribution analysis and Event-Driven Orchestration for document processing (via scheduler tasks).
Processor-Based Workflows
Location: app/processors/workflows/
OCapistaine defines reusable processor classes that implement the workflow pattern:
# Example: Contribution Analysis Workflow
from app.processors.workflows import BaseProcessor, ProcessingContext
class ContributionValidationProcessor(BaseProcessor):
"""Validates citizen contributions against charter rules."""
async def execute(self, context: ProcessingContext) -> ProcessingContext:
# Run validation using Forseti agent
result = await self.agent.validate(context.input_data)
# Store result in context
context.intermediate_results["validation"] = result
context.metadata["validated_at"] = datetime.now()
return context
Step Composition
Steps are chained together in tasks:
# app/services/tasks/task_contributions_analysis.py
async def task_contributions_analysis():
# Load contributions
contributions = await load_contributions()
# Create workflow
workflow = ContributionAnalysisWorkflow(
processors=[
ValidationProcessor(),
ClassificationProcessor(),
EvaluationProcessor(),
]
)
# Execute for each contribution
for contrib in contributions:
context = ProcessingContext(input_data=contrib)
result = await workflow.execute(context)
# Store final result
await save_contribution_result(result)
Asynchronous Task Chains
The scheduler orchestrates multiple tasks in sequence:
# app/services/scheduler/__init__.py
async def orchestrate_task_chain():
"""
Daily workflow:
1. Load contributions (task_load_contributions)
2. Analyze contributions (task_contributions_analysis)
3. Evaluate accuracy (task_opik_evaluate)
4. Generate report (task_generate_report)
"""
chain = TaskChain()
chain.add(task_load_contributions)
chain.add(task_contributions_analysis)
chain.add(task_opik_evaluate)
chain.add(task_generate_report)
result = await chain.execute_sequentially(
continue_on_error=False, # Stop on first failure
timeout=600 # 10 min timeout
)
return result
Designing Workflows
Step 1: Define the Input and Output
class WorkflowInput:
raw_document: str
source: str # "firecrawl", "github", "facebook"
metadata: dict
class WorkflowOutput:
document_id: str
extracted_themes: list[str]
contributions: list[ContributionData]
warnings: list[str]
Step 2: Design Processing Steps
Each step should be:
- Idempotent - Safe to run multiple times
- Isolated - Depends only on context input
- Observable - Logs and traces intermediate state
- Fault-tolerant - Handles errors gracefully
class DocumentChunkingProcessor(BaseProcessor):
async def execute(self, context: ProcessingContext):
document = context.input_data
# Check if already completed (idempotency)
if "chunks" in context.intermediate_results:
return context
# Process
chunks = self.chunk(
document,
max_size=15000,
overlap=500
)
# Store result
context.intermediate_results["chunks"] = chunks
# Log for observability
logger.info(
"chunks_created",
extra={
"document_id": context.metadata.get("document_id"),
"chunk_count": len(chunks),
"total_size": sum(len(c) for c in chunks)
}
)
return context
Step 3: Assemble Workflow
class DocumentProcessingWorkflow:
def __init__(self):
self.steps = [
DocumentChunkingProcessor(),
EmbeddingProcessor(),
VectorStoreProcessor(),
DatasetCreationProcessor(),
]
async def execute(self, document: str):
context = ProcessingContext(input_data=document)
for step in self.steps:
try:
context = await step.execute(context)
except ProcessingError as e:
context.errors.append(str(e))
# Decide: continue or stop?
if step.critical:
raise
return context.output
Step 4: Test Each Step Independently
# Test processor in isolation
async def test_chunking_processor():
processor = DocumentChunkingProcessor()
context = ProcessingContext(
input_data="very long document...",
metadata={"document_id": "test_123"}
)
result = await processor.execute(context)
assert len(result.intermediate_results["chunks"]) > 0
assert result.metadata["chunking_completed"]
Common Patterns
Retry with Exponential Backoff
import asyncio
from functools import wraps
def retry(max_attempts=3, base_delay=1):
def decorator(func):
async def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt) # Exponential
await asyncio.sleep(delay)
return wrapper
return decorator
@retry(max_attempts=3, base_delay=2)
async def call_external_service():
result = await api_call()
return result
Timeout Handling
async def execute_with_timeout(coro, timeout_seconds=30):
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
raise ProcessingError(f"Task exceeded {timeout_seconds}s timeout")
Error Context Preservation
class ProcessingError(Exception):
def __init__(self, step_name: str, original_error: Exception, context: dict):
self.step_name = step_name
self.original_error = original_error
self.context = context
super().__init__(f"{step_name} failed: {original_error}")
# Usage in processor
try:
result = await external_service()
except Exception as e:
raise ProcessingError(
step_name="embedding_generation",
original_error=e,
context={"document_id": doc_id, "attempted_at": datetime.now()}
)
Conditional Branching
async def execute_workflow_with_branching(context):
# Step 1: Analyze content type
content_type = await analyze_content(context.input_data)
context.metadata["content_type"] = content_type
# Step 2: Branch based on type
if content_type == "document":
context = await DocumentProcessingWorkflow().execute(context)
elif content_type == "image":
context = await ImageProcessingWorkflow().execute(context)
elif content_type == "video":
context = await VideoProcessingWorkflow().execute(context)
else:
raise ProcessingError("unknown_content_type", f"Type: {content_type}")
return context
Testing Orchestrated Workflows
Unit Testing (Individual Processors)
@pytest.mark.asyncio
async def test_validation_processor():
processor = ValidationProcessor()
context = ProcessingContext(
input_data={"text": "Test contribution", "author": "test_user"}
)
result = await processor.execute(context)
assert result.intermediate_results["is_valid"] is True
Integration Testing (Multi-Processor)
@pytest.mark.asyncio
async def test_full_workflow():
workflow = ContributionAnalysisWorkflow()
test_contribution = {
"text": "A citizen contribution",
"source": "github_issue"
}
context = ProcessingContext(input_data=test_contribution)
result = await workflow.execute(context)
# Verify full workflow completed
assert "validation" in result.intermediate_results
assert "classification" in result.intermediate_results
assert "evaluation" in result.intermediate_results
Load Testing
@pytest.mark.asyncio
async def test_workflow_under_load():
workflow = ContributionAnalysisWorkflow()
# Simulate 100 contributions
tasks = [
workflow.execute(ProcessingContext(input_data=contrib))
for contrib in generate_test_contributions(100)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Verify all completed and none failed
assert sum(1 for r in results if not isinstance(r, Exception)) == 100
Monitoring and Observability
Workflow Metrics
Track at each step:
class WorkflowMetrics:
step_name: str
start_time: datetime
end_time: datetime
status: str # "success", "failed", "timeout"
duration_ms: int
input_size: int
output_size: int
error_message: str = None
# Log to observability platform
async def log_workflow_metrics(metrics: WorkflowMetrics):
await opik_client.log(
operation_name=f"workflow_{metrics.step_name}",
metadata={
"status": metrics.status,
"duration_ms": metrics.duration_ms,
"input_size": metrics.input_size,
}
)
Step-by-Step Tracing
@OpikTracer.track
async def execute_processor_with_tracing(processor, context):
start = time.time()
try:
result = await processor.execute(context)
duration = time.time() - start
logger.info(f"{processor.__class__.__name__} completed", extra={
"duration_ms": duration * 1000,
"output_size": len(str(result))
})
return result
except Exception as e:
duration = time.time() - start
logger.error(f"{processor.__class__.__name__} failed", extra={
"duration_ms": duration * 1000,
"error": str(e)
})
raise
Troubleshooting
Workflow Hangs
Symptom: Workflow starts but never completes
Causes:
- Deadlock in async operations
- Processor waiting for external service
- Resource exhaustion
Debug:
# Add timeout and logging
async def execute_with_timeout(workflow):
try:
result = await asyncio.wait_for(
workflow.execute(),
timeout=600 # 10 min
)
return result
except asyncio.TimeoutError:
logger.error("Workflow timeout", extra={
"workflow_id": workflow.id,
"current_step": workflow.current_step
})
raise
Partial Failures
Symptom: Some steps succeed, some fail inconsistently
Causes:
- Processor not idempotent (different results on retry)
- Race conditions with shared state
- External service intermittently fails
Fix:
# Make processor idempotent
async def execute(self, context):
# Check if already completed
step_id = f"{self.__class__.__name__}_{context.workflow_id}"
if context.redis.exists(f"completed:{step_id}"):
return context.intermediate_results[self.__class__.__name__]
# Process
result = await self.process()
# Mark as completed
context.redis.set(f"completed:{step_id}", "1", ex=86400)
return result
Cascading Failures
Symptom: One step failure causes multiple subsequent steps to fail
Solution: Use error handling strategies
class WorkflowErrorStrategy:
STOP_ON_ERROR = "stop" # Fail entire workflow
CONTINUE_ON_ERROR = "continue" # Skip failed step, continue
RETRY_ON_ERROR = "retry" # Retry failed step
# Usage
workflow = Workflow(error_strategy=WorkflowErrorStrategy.CONTINUE_ON_ERROR)
Best Practices
DO
- ✅ Make processors idempotent - Safe to retry
- ✅ Store intermediate results - Enable recovery
- ✅ Log at each step - Debug failures
- ✅ Set timeouts - Prevent hangs
- ✅ Test processors independently - Isolate issues
- ✅ Use structured logging - Rich context
- ✅ Monitor workflow metrics - Detect issues early
DON'T
- ❌ Mix sync and async code - Use all async
- ❌ Modify context in-place without saving - Lose state
- ❌ Assume external services are reliable - Always retry
- ❌ Use global state - Makes testing hard
- ❌ Skip error handling - Workflows will fail unpredictably
References
Related Patterns
- Task Scheduling - When to run workflows
- Application Lifecycle - Service initialization
- Logging Integration - Observability patterns
- Agents Framework - Composable AI logic
Tools and Libraries
- APScheduler - Task scheduling
- Celery - Distributed task queue
- Apache Airflow - Workflow orchestration platform
- Temporal - Distributed workflow runtime
- n8n - Visual workflow automation (used in Vaettir)
Examples
- OCapistaine:
app/processors/workflows/for processor-based workflows - Vaettir:
workflows/for n8n-based orchestration
Last Updated: 2026-02-23 Applies To: All Locki ecosystem projects Version: 1.0