Aller au contenu principal

Process Orchestration

Coordinating multi-service and multi-step workflows in production systems

This guide documents how to design and manage complex workflows across multiple services, with a focus on async-first coordination, state management, and fault tolerance.


Overview

Process Orchestration is the art of coordinating multiple services and processing steps into a cohesive workflow. This is distinct from:

Instead, Process Orchestration focuses on: How do multiple services/steps work together?


Core Concepts

1. Workflows (Multi-Step Processes)

A workflow is a sequence of steps that process data through multiple stages:

Input → Step 1 → Step 2 → Step 3 → Output
↓ ↓ ↓
State State State

Example: Document Processing Workflow

Raw Doc → Chunking → Embedding → Storage → Dataset Creation

Each step can:

  • Transform data
  • Call external APIs
  • Store intermediate results
  • Fail and require retry

2. Composition Patterns

Workflows are composed by chaining processors:

workflow = Workflow()
workflow.add_step(ChunkingProcessor())
workflow.add_step(EmbeddingProcessor())
workflow.add_step(StorageProcessor())
result = await workflow.execute(raw_document)

3. State Management

Intermediate results must be stored and passed between steps:

# Each step receives context with previous results
class ProcessingContext:
input_data: Any
intermediate_results: dict[str, Any] # From previous steps
metadata: dict # Tracking info

4. Fault Tolerance

Workflows must handle failures gracefully:

# Retry policy
@retry(max_attempts=3, backoff=exponential)
async def step_with_retry(context: ProcessingContext):
...

# Idempotency (run safely multiple times)
if context.metadata.get("step_completed"):
return context.intermediate_results["step_result"]

Orchestration Patterns

Pattern 1: Sequential Orchestration (Local)

Use when: Steps are tightly coupled and run in same process

# OCapistaine example: Contribution analysis workflow
workflow = ContributionAnalysisWorkflow()

async def analyze_contribution(text: str):
context = ProcessingContext(input_data=text)

# Step 1: Validation
validated = await workflow.validate(context)
context.intermediate_results["validation"] = validated

# Step 2: Classification
classified = await workflow.classify(context)
context.intermediate_results["classification"] = classified

# Step 3: Evaluation
evaluated = await workflow.evaluate(context)
context.intermediate_results["evaluation"] = evaluated

return context

Pros:

  • Simple, direct control
  • Easy to debug
  • No inter-service coordination

Cons:

  • Can't scale steps independently
  • Single failure blocks entire workflow
  • Requires same runtime environment

Pattern 2: Event-Driven Orchestration (Async)

Use when: Steps can run independently with event notifications

# Services connected via message queue (Redis, Kafka)

# Service A: Document Chunker
@app.post("/process")
async def chunk_document(doc_id: str):
chunks = await chunk(doc_id)
# Publish event
await event_bus.publish("document_chunked", {
"doc_id": doc_id,
"chunk_count": len(chunks)
})
return {"status": "chunked"}

# Service B: Embedding Generator (listens for "document_chunked")
@event_bus.on("document_chunked")
async def embed_chunks(event: dict):
doc_id = event["doc_id"]
embeddings = await generate_embeddings(doc_id)
await event_bus.publish("chunks_embedded", {
"doc_id": doc_id,
"embedding_count": len(embeddings)
})

# Service C: Vector Store (listens for "chunks_embedded")
@event_bus.on("chunks_embedded")
async def store_vectors(event: dict):
doc_id = event["doc_id"]
await store_in_vector_db(doc_id)

Pros:

  • Highly scalable (services run independently)
  • Fault isolated (one failure doesn't block others)
  • Services can be in different languages/frameworks
  • Easy to add new services to pipeline

Cons:

  • More complex debugging (distributed trace needed)
  • Eventual consistency (temporary inconsistency possible)
  • Harder to test (requires message setup)

Pattern 3: Workflow Orchestrator Service

Use when: You need both scalability and centralized control

# Dedicated orchestrator coordinates everything

class WorkflowOrchestrator:
async def execute_workflow(self, workflow_id: str):
# Get workflow definition
workflow = await load_workflow(workflow_id)

# Execute steps in order, handling failures
for step in workflow.steps:
try:
result = await self.call_service(
step.service_endpoint,
step.input_data
)
# Store result
await self.store_step_result(
workflow_id,
step.id,
result
)
except StepFailure as e:
# Handle failure
await self.handle_failure(workflow_id, step.id, e)
break

# Client submits workflow request
@app.post("/workflows")
async def submit_workflow(workflow: WorkflowDefinition):
workflow_id = await orchestrator.start_workflow(workflow)
return {"workflow_id": workflow_id}

# Query workflow status
@app.get("/workflows/{workflow_id}/status")
async def get_status(workflow_id: str):
status = await orchestrator.get_status(workflow_id)
return status

Pros:

  • Centralized control and monitoring
  • Easy to add retries, timeouts, error handling
  • Good visibility (single dashboard for all workflows)
  • Works with both local and remote services

Cons:

  • Orchestrator is a bottleneck
  • Single point of failure if not properly designed
  • Requires dedicated service

OCapistaine Implementation

OCapistaine uses Sequential Orchestration for contribution analysis and Event-Driven Orchestration for document processing (via scheduler tasks).

Processor-Based Workflows

Location: app/processors/workflows/

OCapistaine defines reusable processor classes that implement the workflow pattern:

# Example: Contribution Analysis Workflow

from app.processors.workflows import BaseProcessor, ProcessingContext

class ContributionValidationProcessor(BaseProcessor):
"""Validates citizen contributions against charter rules."""

async def execute(self, context: ProcessingContext) -> ProcessingContext:
# Run validation using Forseti agent
result = await self.agent.validate(context.input_data)

# Store result in context
context.intermediate_results["validation"] = result
context.metadata["validated_at"] = datetime.now()

return context

Step Composition

Steps are chained together in tasks:

# app/services/tasks/task_contributions_analysis.py

async def task_contributions_analysis():
# Load contributions
contributions = await load_contributions()

# Create workflow
workflow = ContributionAnalysisWorkflow(
processors=[
ValidationProcessor(),
ClassificationProcessor(),
EvaluationProcessor(),
]
)

# Execute for each contribution
for contrib in contributions:
context = ProcessingContext(input_data=contrib)
result = await workflow.execute(context)

# Store final result
await save_contribution_result(result)

Asynchronous Task Chains

The scheduler orchestrates multiple tasks in sequence:

# app/services/scheduler/__init__.py

async def orchestrate_task_chain():
"""
Daily workflow:
1. Load contributions (task_load_contributions)
2. Analyze contributions (task_contributions_analysis)
3. Evaluate accuracy (task_opik_evaluate)
4. Generate report (task_generate_report)
"""

chain = TaskChain()
chain.add(task_load_contributions)
chain.add(task_contributions_analysis)
chain.add(task_opik_evaluate)
chain.add(task_generate_report)

result = await chain.execute_sequentially(
continue_on_error=False, # Stop on first failure
timeout=600 # 10 min timeout
)

return result

Designing Workflows

Step 1: Define the Input and Output

class WorkflowInput:
raw_document: str
source: str # "firecrawl", "github", "facebook"
metadata: dict

class WorkflowOutput:
document_id: str
extracted_themes: list[str]
contributions: list[ContributionData]
warnings: list[str]

Step 2: Design Processing Steps

Each step should be:

  • Idempotent - Safe to run multiple times
  • Isolated - Depends only on context input
  • Observable - Logs and traces intermediate state
  • Fault-tolerant - Handles errors gracefully
class DocumentChunkingProcessor(BaseProcessor):
async def execute(self, context: ProcessingContext):
document = context.input_data

# Check if already completed (idempotency)
if "chunks" in context.intermediate_results:
return context

# Process
chunks = self.chunk(
document,
max_size=15000,
overlap=500
)

# Store result
context.intermediate_results["chunks"] = chunks

# Log for observability
logger.info(
"chunks_created",
extra={
"document_id": context.metadata.get("document_id"),
"chunk_count": len(chunks),
"total_size": sum(len(c) for c in chunks)
}
)

return context

Step 3: Assemble Workflow

class DocumentProcessingWorkflow:
def __init__(self):
self.steps = [
DocumentChunkingProcessor(),
EmbeddingProcessor(),
VectorStoreProcessor(),
DatasetCreationProcessor(),
]

async def execute(self, document: str):
context = ProcessingContext(input_data=document)

for step in self.steps:
try:
context = await step.execute(context)
except ProcessingError as e:
context.errors.append(str(e))
# Decide: continue or stop?
if step.critical:
raise

return context.output

Step 4: Test Each Step Independently

# Test processor in isolation
async def test_chunking_processor():
processor = DocumentChunkingProcessor()

context = ProcessingContext(
input_data="very long document...",
metadata={"document_id": "test_123"}
)

result = await processor.execute(context)

assert len(result.intermediate_results["chunks"]) > 0
assert result.metadata["chunking_completed"]

Common Patterns

Retry with Exponential Backoff

import asyncio
from functools import wraps

def retry(max_attempts=3, base_delay=1):
def decorator(func):
async def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt) # Exponential
await asyncio.sleep(delay)
return wrapper
return decorator

@retry(max_attempts=3, base_delay=2)
async def call_external_service():
result = await api_call()
return result

Timeout Handling

async def execute_with_timeout(coro, timeout_seconds=30):
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
raise ProcessingError(f"Task exceeded {timeout_seconds}s timeout")

Error Context Preservation

class ProcessingError(Exception):
def __init__(self, step_name: str, original_error: Exception, context: dict):
self.step_name = step_name
self.original_error = original_error
self.context = context
super().__init__(f"{step_name} failed: {original_error}")

# Usage in processor
try:
result = await external_service()
except Exception as e:
raise ProcessingError(
step_name="embedding_generation",
original_error=e,
context={"document_id": doc_id, "attempted_at": datetime.now()}
)

Conditional Branching

async def execute_workflow_with_branching(context):
# Step 1: Analyze content type
content_type = await analyze_content(context.input_data)
context.metadata["content_type"] = content_type

# Step 2: Branch based on type
if content_type == "document":
context = await DocumentProcessingWorkflow().execute(context)
elif content_type == "image":
context = await ImageProcessingWorkflow().execute(context)
elif content_type == "video":
context = await VideoProcessingWorkflow().execute(context)
else:
raise ProcessingError("unknown_content_type", f"Type: {content_type}")

return context

Testing Orchestrated Workflows

Unit Testing (Individual Processors)

@pytest.mark.asyncio
async def test_validation_processor():
processor = ValidationProcessor()
context = ProcessingContext(
input_data={"text": "Test contribution", "author": "test_user"}
)

result = await processor.execute(context)

assert result.intermediate_results["is_valid"] is True

Integration Testing (Multi-Processor)

@pytest.mark.asyncio
async def test_full_workflow():
workflow = ContributionAnalysisWorkflow()

test_contribution = {
"text": "A citizen contribution",
"source": "github_issue"
}

context = ProcessingContext(input_data=test_contribution)
result = await workflow.execute(context)

# Verify full workflow completed
assert "validation" in result.intermediate_results
assert "classification" in result.intermediate_results
assert "evaluation" in result.intermediate_results

Load Testing

@pytest.mark.asyncio
async def test_workflow_under_load():
workflow = ContributionAnalysisWorkflow()

# Simulate 100 contributions
tasks = [
workflow.execute(ProcessingContext(input_data=contrib))
for contrib in generate_test_contributions(100)
]

results = await asyncio.gather(*tasks, return_exceptions=True)

# Verify all completed and none failed
assert sum(1 for r in results if not isinstance(r, Exception)) == 100

Monitoring and Observability

Workflow Metrics

Track at each step:

class WorkflowMetrics:
step_name: str
start_time: datetime
end_time: datetime
status: str # "success", "failed", "timeout"
duration_ms: int
input_size: int
output_size: int
error_message: str = None

# Log to observability platform
async def log_workflow_metrics(metrics: WorkflowMetrics):
await opik_client.log(
operation_name=f"workflow_{metrics.step_name}",
metadata={
"status": metrics.status,
"duration_ms": metrics.duration_ms,
"input_size": metrics.input_size,
}
)

Step-by-Step Tracing

@OpikTracer.track
async def execute_processor_with_tracing(processor, context):
start = time.time()
try:
result = await processor.execute(context)
duration = time.time() - start
logger.info(f"{processor.__class__.__name__} completed", extra={
"duration_ms": duration * 1000,
"output_size": len(str(result))
})
return result
except Exception as e:
duration = time.time() - start
logger.error(f"{processor.__class__.__name__} failed", extra={
"duration_ms": duration * 1000,
"error": str(e)
})
raise

Troubleshooting

Workflow Hangs

Symptom: Workflow starts but never completes

Causes:

  1. Deadlock in async operations
  2. Processor waiting for external service
  3. Resource exhaustion

Debug:

# Add timeout and logging
async def execute_with_timeout(workflow):
try:
result = await asyncio.wait_for(
workflow.execute(),
timeout=600 # 10 min
)
return result
except asyncio.TimeoutError:
logger.error("Workflow timeout", extra={
"workflow_id": workflow.id,
"current_step": workflow.current_step
})
raise

Partial Failures

Symptom: Some steps succeed, some fail inconsistently

Causes:

  1. Processor not idempotent (different results on retry)
  2. Race conditions with shared state
  3. External service intermittently fails

Fix:

# Make processor idempotent
async def execute(self, context):
# Check if already completed
step_id = f"{self.__class__.__name__}_{context.workflow_id}"
if context.redis.exists(f"completed:{step_id}"):
return context.intermediate_results[self.__class__.__name__]

# Process
result = await self.process()

# Mark as completed
context.redis.set(f"completed:{step_id}", "1", ex=86400)

return result

Cascading Failures

Symptom: One step failure causes multiple subsequent steps to fail

Solution: Use error handling strategies

class WorkflowErrorStrategy:
STOP_ON_ERROR = "stop" # Fail entire workflow
CONTINUE_ON_ERROR = "continue" # Skip failed step, continue
RETRY_ON_ERROR = "retry" # Retry failed step

# Usage
workflow = Workflow(error_strategy=WorkflowErrorStrategy.CONTINUE_ON_ERROR)

Best Practices

DO

  • Make processors idempotent - Safe to retry
  • Store intermediate results - Enable recovery
  • Log at each step - Debug failures
  • Set timeouts - Prevent hangs
  • Test processors independently - Isolate issues
  • Use structured logging - Rich context
  • Monitor workflow metrics - Detect issues early

DON'T

  • Mix sync and async code - Use all async
  • Modify context in-place without saving - Lose state
  • Assume external services are reliable - Always retry
  • Use global state - Makes testing hard
  • Skip error handling - Workflows will fail unpredictably

References

Tools and Libraries

  • APScheduler - Task scheduling
  • Celery - Distributed task queue
  • Apache Airflow - Workflow orchestration platform
  • Temporal - Distributed workflow runtime
  • n8n - Visual workflow automation (used in Vaettir)

Examples

  • OCapistaine: app/processors/workflows/ for processor-based workflows
  • Vaettir: workflows/ for n8n-based orchestration

Last Updated: 2026-02-23 Applies To: All Locki ecosystem projects Version: 1.0