Aller au contenu principal

Task: Opik Evaluate

Scheduled task that processes recent Opik spans and runs evaluation experiments.

Overview

task_opik_evaluate handles the async nature of Opik span ingestion (spans take ~3 minutes to appear after creation) by running periodically to:

  1. Clean up error traces (optional)
  2. Search for recent spans not yet added to a dataset
  3. Create a dataset from those spans
  4. Run Opik evaluate() with configured metrics
  5. Report results

Schedule

# Cron: Every 30 minutes, 7 AM - 10 PM
OPIK_EVALUATE_CRON = "*/30 7-22 * * *"

Parameters

ParameterTypeDefaultDescription
date_stringstrtodayDate in YYYYMMDD format
experiment_typestr"charter_optimization"Type from AGENT_FEATURE_REGISTRY
max_itemsint50Maximum spans to include
lookback_hoursint24Hours to look back for spans
metricslist[str]["hallucination", "output_format"]Opik metrics to use
task_providerstr"gemini"LLM provider for evaluation task
skip_if_emptyboolTrueSkip without error if no new spans
cleanup_errorsboolTrueDelete error traces before processing

Workflow

┌─────────────────────────────────────────────────────────────┐
│ task_opik_evaluate │
├─────────────────────────────────────────────────────────────┤
│ │
│ Step 0: Cleanup Error Traces (if cleanup_errors=True) │
│ Delete traces with "error", "retries exhausted" │
│ ↓ │
│ Step 1: Search Recent Spans │
│ Filter: name = "{span_name}" AND type = "llm" │
│ ↓ │
│ Step 2: Filter Already-Added │
│ Exclude spans with added_to_dataset feedback │
│ ↓ │
│ Step 3: Create Dataset │
│ dataset: {prefix}-{date}-{time} │
│ ↓ │
│ Step 4: Run Opik Evaluate │
│ experiment: {feature}-eval-{date}-{time} │
│ │
└─────────────────────────────────────────────────────────────┘

Usage

Via Admin Dashboard

  1. Navigate to Admin tab
  2. Find task_opik_evaluate in Manual Triggers
  3. Configure:
    • Experiment type
    • LLM provider
    • Max items
  4. Click Run Now

Via Scheduler

The task runs automatically every 30 minutes. To modify:

# In app/services/scheduler/__init__.py
OPIK_EVALUATE_CRON = "0 */2 * * *" # Every 2 hours instead

Programmatically

from app.services.tasks.task_opik_evaluate import task_opik_evaluate

result = task_opik_evaluate(
experiment_type="charter_optimization",
max_items=100,
metrics=["hallucination", "moderation", "output_format"],
task_provider="ollama",
cleanup_errors=True,
)

Result Structure

{
"status": "success", # or "skipped", "failed"
"task_id": "task_opik_evaluate",
"date_string": "20260204",
"experiment_type": "charter_optimization",
"max_items": 50,
"lookback_hours": 24,
"metrics": ["hallucination", "output_format"],
"task_provider": "gemini",
"cleanup_errors": True,
"cleanup_result": {
"project": "ocapistaine-test",
"total_traces": 408,
"error_traces": 5,
"deleted": 5,
"error_patterns": {"error": 3, "retries exhausted": 2}
},
"spans_found": 25,
"spans_new": 12,
"dataset_name": "charter-optimization-20260204-143052",
"experiment_result": {
"status": "success",
"experiment_name": "charter_validation-eval-20260204-143052",
"eval_results": {...}
},
"errors": [],
"warnings": []
}

Metrics

Default Metrics

MetricTypeDescription
hallucinationbuiltinDetects false information (LLM judge)
output_formatcustomMeasures format compliance (0-1 scale)

Available Metrics

from app.processors.workflows.workflow_experiment import list_available_metrics

for m in list_available_metrics():
print(f"{m['name']}: {m['description']} ({m['type']})")

Error Cleanup

The task can automatically delete error traces before processing. This prevents:

  • Polluted optimization data
  • Divergent experiment results
  • Wasted compute on invalid traces

Error Patterns Detected

  • "error" - Generic errors
  • "retries exhausted" - API retry failures
  • "rate limit" - Rate limiting errors
  • "validation error" - LLM validation failures
  • "timeout" - Request timeouts
  • "failed" - Generic failures

Disable Cleanup

result = task_opik_evaluate(cleanup_errors=False)

Redis Keys

Key PatternTTLPurpose
lock:task_opik_evaluate:{date}5 minPrevent concurrent runs
success:task_opik_evaluate:{date}24hTrack completion

Note: Task uses skip_success_check=True so it can run multiple times per day.

Files

FilePurpose
app/services/tasks/task_opik_evaluate.pyTask implementation
app/processors/workflows/workflow_experiment.pyExperiment execution
app/services/scheduler/__init__.pyCron registration

See Also