Capítulo 18: Monitoreo y Observabilidad
Capítulo 18: Monitoreo y Observabilidad
Un agente autónomo que falla silenciosamente es indetectable sin buena observabilidad. A diferencia de los servicios web donde un error es inmediato, un agente puede ejecutar 20 herramientas, consumir $0.50 en tokens y producir un resultado incorrecto sin que ningún sistema de monitoreo tradicional lo detecte. Este capítulo construye el sistema completo de observabilidad para agentes.
1. Los 3 Pilares: Logs, Métricas, Trazas
Aplicados a Agentes de IA
flowchart TB
subgraph Pillars["Los 3 Pilares de Observabilidad"]
direction LR
subgraph Logs["Logs (¿Qué pasó?)"]
L1["Query recibido\nsession_id: abc"]
L2["Tool llamado: Bash\ninput: ls -la"]
L3["Tool resultado: OK\noutput: 3 archivos"]
L4["Query completado\ncosto: $0.05"]
end
subgraph Metrics["Métricas (¿Cuánto/cuándo?)"]
M1["queries_total: 1547"]
M2["latencia_p99: 45s"]
M3["costo_usd_total: $23.4"]
M4["errores_rate: 1.2%"]
end
subgraph Traces["Trazas (¿Por qué tardó?)"]
T1["Span: query total\n45.2s"]
T2["Span: LLM call 1\n3.1s"]
T3["Span: Bash tool\n0.5s"]
T4["Span: LLM call 2\n2.8s"]
end
end
subgraph Stack["Stack Recomendado"]
S1["Structlog / Pino"]
S2["Prometheus + Grafana"]
S3["OpenTelemetry + Jaeger"]
end
Logs --> S1
Metrics --> S2
Traces --> S3
Por Qué la Observabilidad es Crítica para Agentes Autónomos
Los agentes autónomos tienen características que hacen la observabilidad especialmente crítica:
- Duración larga: Una sesión de 30 minutos puede fallar en el minuto 28, perdiendo todo el trabajo y el costo incurrido.
- Costo variable: Un bug de prompt puede hacer que el agente llame herramientas en loop infinito, generando un costo de $50 en minutos.
- No-determinismo: El mismo prompt puede producir resultados diferentes. Sin logs del razonamiento, es imposible debuggear.
- Efectos secundarios reales: El agente puede modificar archivos, ejecutar código, enviar emails. Los errores tienen consecuencias reales.
- Multi-paso: Un error en el paso 15 de 20 puede ser causado por una decisión incorrecta en el paso 3. Las trazas distribuidas son la única forma de entender esto.
Stack Recomendado
| Pilar | Tool Python | Tool TypeScript | Backend |
|---|---|---|---|
| Logs | structlog | pino | Loki + Grafana |
| Métricas | prometheus-client | prom-client | Prometheus + Grafana |
| Trazas | opentelemetry-sdk | @opentelemetry/sdk-node | Jaeger / Tempo |
| Errores | sentry-sdk | @sentry/node | Sentry Cloud |
| Costos | Custom counter | Custom counter | Grafana + PostgreSQL |
2. Logging Estructurado
Python con structlog
# logging_setup.py
import logging
import sys
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars
def configure_logging(level: str = "INFO", json_output: bool = True) -> None:
"""
Configura structlog para logging estructurado.
En producción usa json_output=True para que el log aggregator
(Loki, CloudWatch, etc.) pueda parsear y filtrar campos.
"""
processors = [
# Añadir timestamp ISO 8601
structlog.processors.TimeStamper(fmt="iso"),
# Añadir nivel como string
structlog.stdlib.add_log_level,
# Añadir nombre del logger
structlog.stdlib.add_logger_name,
# Añadir contexto de variables de contexto
structlog.contextvars.merge_contextvars,
# Añadir stack trace en excepciones
structlog.processors.StackInfoRenderer(),
structlog.processors.ExceptionRenderer(),
]
if json_output:
processors.append(structlog.processors.JSONRenderer())
else:
# Para desarrollo: output legible
processors.append(structlog.dev.ConsoleRenderer())
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(
logging.getLevelName(level)
),
logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
cache_logger_on_first_use=True,
)
Uso con contexto automático por query:
# agent_logger.py
import structlog
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import uuid
import time
logger = structlog.get_logger()
@asynccontextmanager
async def agent_logging_context(
session_id: str | None = None,
user_id: str | None = None,
agent_name: str = "default"
) -> AsyncGenerator[structlog.BoundLogger, None]:
"""
Context manager que añade contexto automáticamente a todos los logs
dentro del bloque.
"""
from structlog.contextvars import bind_contextvars, clear_contextvars
query_id = str(uuid.uuid4())[:8]
start_time = time.monotonic()
# Vincular contexto: todos los logs dentro del bloque incluirán estos campos
bind_contextvars(
query_id=query_id,
session_id=session_id,
user_id=user_id,
agent_name=agent_name
)
log = logger.bind(query_id=query_id)
try:
log.info("query_started", agent=agent_name)
yield log
duration = time.monotonic() - start_time
log.info("query_completed", duration_seconds=round(duration, 2))
except Exception as e:
duration = time.monotonic() - start_time
log.error(
"query_failed",
error=str(e),
error_type=type(e).__name__,
duration_seconds=round(duration, 2)
)
raise
finally:
clear_contextvars()
Logging de cada herramienta usada:
# tool_logger.py
import time
import structlog
from claude_code_sdk import query, ClaudeCodeOptions
logger = structlog.get_logger()
def _redact_sensitive(text: str) -> str:
"""Redacta datos sensibles de logs."""
import re
# Redactar API keys
text = re.sub(r'sk-ant-[a-zA-Z0-9-]+', 'sk-ant-[REDACTED]', text)
# Redactar passwords
text = re.sub(r'password["\s:=]+[^\s"]+', 'password=[REDACTED]', text, flags=re.IGNORECASE)
# Redactar tokens JWT
text = re.sub(r'eyJ[a-zA-Z0-9._-]+', '[JWT_REDACTED]', text)
return text
async def run_agent_with_tool_logging(
prompt: str,
agent_name: str = "default"
) -> str:
"""Ejecuta el agente loggeando cada herramienta con su input/output."""
results = []
tool_calls = []
async with agent_logging_context(agent_name=agent_name) as log:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=["Read", "Edit", "Bash"]
)
):
message_type = type(message).__name__
if message_type == "AssistantMessage":
# Log del razonamiento del LLM (truncado)
content = str(getattr(message, 'content', ''))
log.debug(
"llm_response",
preview=content[:200] + "..." if len(content) > 200 else content
)
elif message_type == "ToolUseBlock":
tool_name = getattr(message, 'name', 'unknown')
tool_input = getattr(message, 'input', {})
# Redactar datos sensibles del input
safe_input = _redact_sensitive(str(tool_input))
tool_start = time.monotonic()
log.info(
"tool_called",
tool=tool_name,
input_preview=safe_input[:300]
)
tool_calls.append({
"tool": tool_name,
"start": tool_start
})
elif message_type == "ToolResultBlock":
if tool_calls:
last_call = tool_calls[-1]
duration = time.monotonic() - last_call["start"]
tool_output = str(getattr(message, 'content', ''))
safe_output = _redact_sensitive(tool_output)
log.info(
"tool_completed",
tool=last_call["tool"],
duration_seconds=round(duration, 3),
output_bytes=len(tool_output),
output_preview=safe_output[:200]
)
elif hasattr(message, 'content'):
content = str(message.content)
if content:
results.append(content)
log.info("tools_summary", total_tool_calls=len(tool_calls))
return "\n".join(results)
TypeScript con Pino
// logger.ts
import pino, { Logger } from "pino";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const logger = pino({
level: process.env.LOG_LEVEL ?? "info",
// En producción: JSON para log aggregators
// En desarrollo: pretty print
transport:
process.env.NODE_ENV === "development"
? { target: "pino-pretty", options: { colorize: true } }
: undefined,
base: {
service: "claude-agent",
version: process.env.npm_package_version,
},
// Redactar campos sensibles automáticamente
redact: {
paths: ["*.api_key", "*.password", "*.token", "*.secret"],
censor: "[REDACTED]",
},
});
interface AgentContext {
queryId: string;
sessionId?: string;
userId?: string;
agentName: string;
}
function createChildLogger(ctx: AgentContext): Logger {
return logger.child(ctx);
}
async function runAgentWithLogging(
prompt: string,
agentName: string = "default"
): Promise<string> {
const queryId = crypto.randomUUID().slice(0, 8);
const log = createChildLogger({ queryId, agentName });
const startTime = Date.now();
log.info({ promptPreview: prompt.slice(0, 100) }, "query_started");
const results: string[] = [];
const toolCalls: Array<{ tool: string; start: number }> = [];
try {
for await (const message of query(prompt, {
allowedTools: ["Read", "Edit", "Bash"],
maxTurns: 50,
} as ClaudeCodeOptions)) {
const messageType = (message as { type?: string }).type;
if (messageType === "tool_use") {
const toolName = (message as { name?: string }).name ?? "unknown";
log.info({ tool: toolName }, "tool_called");
toolCalls.push({ tool: toolName, start: Date.now() });
}
if (messageType === "tool_result" && toolCalls.length > 0) {
const last = toolCalls[toolCalls.length - 1];
const durationMs = Date.now() - last.start;
log.info({ tool: last.tool, durationMs }, "tool_completed");
}
if ("content" in message) {
const content = String(message.content);
if (content) results.push(content);
}
}
const durationMs = Date.now() - startTime;
log.info(
{ durationMs, totalToolCalls: toolCalls.length },
"query_completed"
);
return results.join("\n");
} catch (error) {
const err = error as Error;
const durationMs = Date.now() - startTime;
log.error({ error: err.message, durationMs }, "query_failed");
throw error;
}
}
export { logger, createChildLogger, runAgentWithLogging };
Niveles de Log por Tipo de Evento
| Nivel | Cuándo usar | Ejemplos |
|---|---|---|
DEBUG | Desarrollo, diagnosis | Tool input/output completo, LLM reasoning |
INFO | Eventos normales de negocio | Query iniciado/completado, herramienta llamada |
WARN | Situaciones inusuales no críticas | Retry realizado, costo elevado, timeout cercano |
ERROR | Fallos que requieren atención | Query fallido, herramienta falló, API error |
FATAL | El proceso no puede continuar | Config inválida, port en uso |
Rotación y Retención de Logs
# log_rotation.py
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
def configure_file_logging(log_dir: str = "/app/logs") -> None:
"""Configura rotación de logs en archivo."""
# Rotación por tamaño: máximo 100MB por archivo, 10 archivos
size_handler = RotatingFileHandler(
f"{log_dir}/agent.log",
maxBytes=100 * 1024 * 1024, # 100MB
backupCount=10,
encoding="utf-8"
)
# Rotación diaria: mantener 30 días
daily_handler = TimedRotatingFileHandler(
f"{log_dir}/agent-daily.log",
when="midnight",
interval=1,
backupCount=30,
encoding="utf-8"
)
logging.getLogger().addHandler(size_handler)
logging.getLogger().addHandler(daily_handler)
3. Métricas con Prometheus
Métricas Clave para Agentes
flowchart LR
subgraph Counters["Counters (siempre crecen)"]
C1["queries_total{status, agent}"]
C2["tool_calls_total{tool, status}"]
C3["cost_usd_total{agent}"]
C4["errors_total{type, agent}"]
end
subgraph Histograms["Histograms (distribución)"]
H1["query_duration_seconds{agent}"]
H2["tool_duration_seconds{tool}"]
H3["tokens_used{direction, agent}"]
end
subgraph Gauges["Gauges (valor actual)"]
G1["active_queries{agent}"]
G2["queue_depth"]
G3["session_pool_size"]
end
Implementación Python con prometheus-client
# metrics.py
from prometheus_client import (
Counter, Histogram, Gauge, Summary,
generate_latest, CONTENT_TYPE_LATEST,
CollectorRegistry, push_to_gateway
)
import time
from functools import wraps
from typing import Callable, Any
from claude_code_sdk import query, ClaudeCodeOptions
# Registry personalizado (evita conflictos con métricas del proceso)
REGISTRY = CollectorRegistry()
# ============================================================
# Definición de métricas
# ============================================================
QUERIES_TOTAL = Counter(
"agent_queries_total",
"Total de queries ejecutados por el agente",
["status", "agent_name", "model"],
registry=REGISTRY
)
TOOL_CALLS_TOTAL = Counter(
"agent_tool_calls_total",
"Total de llamadas a herramientas",
["tool_name", "status", "agent_name"],
registry=REGISTRY
)
COST_USD_TOTAL = Counter(
"agent_cost_usd_total",
"Costo total en USD incurrido por el agente",
["agent_name", "model"],
registry=REGISTRY
)
ERRORS_TOTAL = Counter(
"agent_errors_total",
"Total de errores por tipo",
["error_type", "agent_name"],
registry=REGISTRY
)
QUERY_DURATION = Histogram(
"agent_query_duration_seconds",
"Duración de queries en segundos",
["agent_name"],
buckets=[5, 10, 30, 60, 120, 300, 600, 1200, 1800],
registry=REGISTRY
)
TOOL_DURATION = Histogram(
"agent_tool_duration_seconds",
"Duración de cada herramienta en segundos",
["tool_name"],
buckets=[0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60],
registry=REGISTRY
)
ACTIVE_QUERIES = Gauge(
"agent_active_queries",
"Número de queries ejecutándose actualmente",
["agent_name"],
registry=REGISTRY
)
TOKENS_USED = Histogram(
"agent_tokens_used_total",
"Tokens usados por query",
["direction", "agent_name"], # direction: input/output
buckets=[100, 500, 1000, 5000, 10000, 50000, 100000],
registry=REGISTRY
)
# ============================================================
# Decorator para medir métricas automáticamente
# ============================================================
def track_agent_metrics(agent_name: str = "default"):
"""Decorator que trackea métricas automáticamente para una función de agente."""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
ACTIVE_QUERIES.labels(agent_name=agent_name).inc()
start_time = time.monotonic()
status = "success"
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = "error"
ERRORS_TOTAL.labels(
error_type=type(e).__name__,
agent_name=agent_name
).inc()
raise
finally:
duration = time.monotonic() - start_time
ACTIVE_QUERIES.labels(agent_name=agent_name).dec()
QUERIES_TOTAL.labels(
status=status,
agent_name=agent_name,
model="claude-3-5-sonnet"
).inc()
QUERY_DURATION.labels(agent_name=agent_name).observe(duration)
return wrapper
return decorator
# ============================================================
# Uso completo con el SDK
# ============================================================
@track_agent_metrics(agent_name="code_reviewer")
async def run_code_review_agent(file_path: str) -> str:
results = []
total_cost = 0.0
async for message in query(
prompt=f"Revisa el código en {file_path} y encuentra bugs",
options=ClaudeCodeOptions(
allowed_tools=["Read", "Bash"],
max_turns=20
)
):
# Trackear costo
if hasattr(message, 'cost_usd') and message.cost_usd:
cost = float(message.cost_usd)
total_cost += cost
COST_USD_TOTAL.labels(
agent_name="code_reviewer",
model="claude-3-5-sonnet"
).inc(cost)
# Trackear herramientas
if hasattr(message, 'type') and message.type == "tool_use":
tool_name = getattr(message, 'name', 'unknown')
tool_start = time.monotonic()
# Nota: el resultado viene en el siguiente mensaje
# En una implementación real, correlacionarías input y output
TOOL_CALLS_TOTAL.labels(
tool_name=tool_name,
status="called",
agent_name="code_reviewer"
).inc()
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
# ============================================================
# Endpoint de métricas para Prometheus
# ============================================================
def get_metrics() -> tuple[bytes, str]:
"""Retorna métricas en formato Prometheus."""
return generate_latest(REGISTRY), CONTENT_TYPE_LATEST
Implementación TypeScript con prom-client
// metrics.ts
import {
Registry,
Counter,
Histogram,
Gauge,
collectDefaultMetrics,
} from "prom-client";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const register = new Registry();
collectDefaultMetrics({ register });
// ============================================================
// Definición de métricas
// ============================================================
const queriesTotal = new Counter({
name: "agent_queries_total",
help: "Total de queries ejecutados",
labelNames: ["status", "agent_name", "model"] as const,
registers: [register],
});
const toolCallsTotal = new Counter({
name: "agent_tool_calls_total",
help: "Total de llamadas a herramientas",
labelNames: ["tool_name", "status", "agent_name"] as const,
registers: [register],
});
const costUsdTotal = new Counter({
name: "agent_cost_usd_total",
help: "Costo total en USD",
labelNames: ["agent_name", "model"] as const,
registers: [register],
});
const queryDuration = new Histogram({
name: "agent_query_duration_seconds",
help: "Duración de queries en segundos",
labelNames: ["agent_name"] as const,
buckets: [5, 10, 30, 60, 120, 300, 600, 1800],
registers: [register],
});
const toolDuration = new Histogram({
name: "agent_tool_duration_seconds",
help: "Duración de herramientas en segundos",
labelNames: ["tool_name"] as const,
buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30],
registers: [register],
});
const activeQueries = new Gauge({
name: "agent_active_queries",
help: "Queries ejecutándose actualmente",
labelNames: ["agent_name"] as const,
registers: [register],
});
// ============================================================
// Agente con métricas
// ============================================================
async function runAgentWithMetrics(
prompt: string,
agentName: string = "default"
): Promise<string> {
const startTime = Date.now();
activeQueries.labels({ agent_name: agentName }).inc();
const results: string[] = [];
let status = "success";
try {
for await (const message of query(prompt, {
allowedTools: ["Read", "Bash"],
maxTurns: 30,
} as ClaudeCodeOptions)) {
if ("cost_usd" in message && typeof message.cost_usd === "number") {
costUsdTotal
.labels({ agent_name: agentName, model: "claude-3-5-sonnet" })
.inc(message.cost_usd);
}
if ("type" in message && (message as { type: string }).type === "tool_use") {
const toolName =
(message as { name?: string }).name ?? "unknown";
toolCallsTotal
.labels({ tool_name: toolName, status: "called", agent_name: agentName })
.inc();
}
if ("content" in message) {
const content = String(message.content);
if (content) results.push(content);
}
}
return results.join("\n");
} catch (error) {
status = "error";
throw error;
} finally {
const durationSeconds = (Date.now() - startTime) / 1000;
activeQueries.labels({ agent_name: agentName }).dec();
queriesTotal
.labels({ status, agent_name: agentName, model: "claude-3-5-sonnet" })
.inc();
queryDuration.labels({ agent_name: agentName }).observe(durationSeconds);
}
}
async function getMetrics(): Promise<{ body: string; contentType: string }> {
return {
body: await register.metrics(),
contentType: register.contentType,
};
}
export {
register,
queriesTotal,
toolCallsTotal,
costUsdTotal,
queryDuration,
activeQueries,
runAgentWithMetrics,
getMetrics,
};
4. OpenTelemetry para Trazas
Conceptos de Tracing para Agentes
gantt
title Traza distribuida de un query del agente
dateFormat X
axisFormat %Ss
section Query principal
Span: query_total :0, 45
section LLM Calls
Span: llm_call_1 :0, 3
Span: llm_call_2 :8, 5
Span: llm_call_3 :20, 4
Span: llm_call_4 :35, 6
section Tool Calls
Span: tool_read_file :3, 1
Span: tool_bash_ls :13, 2
Span: tool_edit_file :24, 6
Span: tool_bash_test :31, 3
Implementación Python con OpenTelemetry
# tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import os
def configure_tracing(service_name: str = "claude-agent") -> trace.Tracer:
"""Configura OpenTelemetry con exportador a Jaeger."""
resource = Resource.create({
"service.name": service_name,
"service.version": os.getenv("APP_VERSION", "unknown"),
"deployment.environment": os.getenv("ENVIRONMENT", "production"),
})
# Exportador a Jaeger
jaeger_exporter = JaegerExporter(
agent_host_name=os.getenv("JAEGER_HOST", "localhost"),
agent_port=int(os.getenv("JAEGER_PORT", "6831")),
)
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
tracer = configure_tracing()
async def run_agent_with_tracing(
prompt: str,
agent_name: str = "default",
parent_context: dict | None = None
) -> str:
"""Ejecuta el agente con trazas completas de cada paso."""
# Propagar contexto de trace desde el caller (para sistemas distribuidos)
propagator = TraceContextTextMapPropagator()
context = propagator.extract(parent_context or {})
with tracer.start_as_current_span(
"agent.query",
context=context,
attributes={
"agent.name": agent_name,
"prompt.length": len(prompt),
"prompt.preview": prompt[:100],
}
) as query_span:
results = []
llm_call_count = 0
tool_call_count = 0
current_tool_span = None
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=["Read", "Edit", "Bash"],
max_turns=50
)
):
message_type = type(message).__name__
if message_type == "AssistantMessage":
llm_call_count += 1
# Span para cada llamada al LLM
with tracer.start_as_current_span(
f"agent.llm_call",
attributes={
"llm.call_number": llm_call_count,
"model": "claude-3-5-sonnet",
}
) as llm_span:
content = str(getattr(message, 'content', ''))
llm_span.set_attribute("response.length", len(content))
elif message_type == "ToolUseBlock":
tool_name = getattr(message, 'name', 'unknown')
tool_call_count += 1
# Abrir span para la herramienta (se cierra en ToolResultBlock)
current_tool_span = tracer.start_span(
f"agent.tool.{tool_name}",
attributes={
"tool.name": tool_name,
"tool.call_number": tool_call_count,
}
)
elif message_type == "ToolResultBlock" and current_tool_span:
output = str(getattr(message, 'content', ''))
current_tool_span.set_attribute("tool.output_bytes", len(output))
current_tool_span.set_attribute(
"tool.status",
"error" if getattr(message, 'is_error', False) else "success"
)
current_tool_span.end()
current_tool_span = None
if hasattr(message, 'cost_usd') and message.cost_usd:
cost = float(message.cost_usd)
query_span.set_attribute(
"agent.cost_usd",
round(cost + float(query_span.attributes.get("agent.cost_usd", 0)), 6)
)
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
query_span.set_attribute("agent.llm_calls", llm_call_count)
query_span.set_attribute("agent.tool_calls", tool_call_count)
query_span.set_attribute("agent.status", "success")
except Exception as e:
query_span.set_attribute("agent.status", "error")
query_span.record_exception(e)
if current_tool_span:
current_tool_span.set_attribute("tool.status", "error")
current_tool_span.end()
raise
return "\n".join(results)
Context Propagation entre Agentes
# multi_agent_tracing.py
from opentelemetry import trace, context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
propagator = TraceContextTextMapPropagator()
async def orchestrator_with_tracing(task: str) -> str:
"""Orquestador que propaga contexto de tracing a subagentes."""
with tracer.start_as_current_span("orchestrator.execute") as span:
span.set_attribute("task.description", task[:100])
# Extraer contexto actual para propagar a subagentes
carrier: dict = {}
propagator.inject(carrier)
# Subagente 1: análisis
with tracer.start_as_current_span("orchestrator.call_analyzer"):
analysis = await call_subagent(
prompt=f"Analiza: {task}",
trace_context=carrier # Propagar contexto
)
# Subagente 2: implementación
with tracer.start_as_current_span("orchestrator.call_implementer"):
implementation = await call_subagent(
prompt=f"Implementa basado en: {analysis}",
trace_context=carrier
)
return implementation
async def call_subagent(prompt: str, trace_context: dict) -> str:
"""Subagente que continúa la traza del orquestador."""
# Restaurar contexto del padre
ctx = propagator.extract(trace_context)
with tracer.start_as_current_span(
"subagent.execute",
context=ctx # Continuar traza del padre
) as span:
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=["Read", "Bash"])
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
Exportar a Jaeger con Docker
# docker-compose con Jaeger
services:
jaeger:
image: jaegertracing/all-in-one:1.58
ports:
- "16686:16686" # UI
- "6831:6831/udp" # Thrift compact
- "4317:4317" # OTLP gRPC
environment:
COLLECTOR_OTLP_ENABLED: "true"
SPAN_STORAGE_TYPE: badger
volumes:
- jaeger_data:/badger
5. Grafana Dashboards
Overview del Agente
flowchart LR
subgraph GrafanaDashboard["Dashboard: Claude Agent Overview"]
direction TB
subgraph Row1["Fila 1: Actividad"]
P1["Queries/min\nStat Panel"]
P2["Active Queries\nGauge Panel"]
P3["Success Rate\nStat Panel"]
end
subgraph Row2["Latencia"]
P4["Latencia p50/p95/p99\nTime Series"]
P5["Distribución duración\nHeatmap"]
end
subgraph Row3["Costos"]
P6["Costo USD/hora\nTime Series"]
P7["Costo acumulado hoy\nStat Panel"]
P8["Top herramientas por costo\nBar Chart"]
end
subgraph Row4["Errores"]
P9["Error rate\nTime Series"]
P10["Errores por tipo\nPie Chart"]
end
end
JSON del Dashboard para Importar
{
"title": "Claude Agent Overview",
"uid": "claude-agent-overview",
"tags": ["claude", "agent", "ai"],
"time": { "from": "now-6h", "to": "now" },
"refresh": "30s",
"panels": [
{
"id": 1,
"title": "Queries por minuto",
"type": "stat",
"gridPos": { "h": 4, "w": 4, "x": 0, "y": 0 },
"targets": [
{
"expr": "rate(agent_queries_total[1m]) * 60",
"legendFormat": "queries/min"
}
],
"options": {
"reduceOptions": { "calcs": ["lastNotNull"] },
"colorMode": "background",
"thresholds": {
"steps": [
{ "value": 0, "color": "green" },
{ "value": 10, "color": "yellow" },
{ "value": 50, "color": "red" }
]
}
}
},
{
"id": 2,
"title": "Latencia p50/p95/p99",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 },
"targets": [
{
"expr": "histogram_quantile(0.50, rate(agent_query_duration_seconds_bucket[5m]))",
"legendFormat": "p50"
},
{
"expr": "histogram_quantile(0.95, rate(agent_query_duration_seconds_bucket[5m]))",
"legendFormat": "p95"
},
{
"expr": "histogram_quantile(0.99, rate(agent_query_duration_seconds_bucket[5m]))",
"legendFormat": "p99"
}
],
"fieldConfig": {
"defaults": {
"unit": "s",
"custom": { "lineWidth": 2 }
}
}
},
{
"id": 3,
"title": "Costo USD acumulado hoy",
"type": "stat",
"gridPos": { "h": 4, "w": 4, "x": 8, "y": 0 },
"targets": [
{
"expr": "increase(agent_cost_usd_total[24h])",
"legendFormat": "USD hoy"
}
],
"options": {
"reduceOptions": { "calcs": ["sum"] },
"colorMode": "background",
"thresholds": {
"steps": [
{ "value": 0, "color": "green" },
{ "value": 20, "color": "yellow" },
{ "value": 50, "color": "red" }
]
}
},
"fieldConfig": {
"defaults": { "unit": "currencyUSD" }
}
},
{
"id": 4,
"title": "Error Rate",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 },
"targets": [
{
"expr": "rate(agent_queries_total{status='error'}[5m]) / rate(agent_queries_total[5m])",
"legendFormat": "error rate"
}
],
"fieldConfig": {
"defaults": {
"unit": "percentunit",
"thresholds": {
"steps": [
{ "value": 0, "color": "green" },
{ "value": 0.05, "color": "red" }
]
}
}
}
},
{
"id": 5,
"title": "Herramientas más usadas",
"type": "barchart",
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 12 },
"targets": [
{
"expr": "sum by (tool_name) (increase(agent_tool_calls_total[24h]))",
"legendFormat": "{{ tool_name }}"
}
]
}
]
}
6. Alertas
AlertManager Rules
# alerting-rules.yaml
groups:
- name: claude-agent-critical
interval: 30s
rules:
# Error rate alto
- alert: AgentHighErrorRate
expr: |
(
rate(agent_queries_total{status="error"}[5m])
/
rate(agent_queries_total[5m])
) > 0.05
for: 5m
labels:
severity: critical
team: platform
annotations:
summary: "Error rate del agente {{ $labels.agent_name }} > 5%"
description: |
El agente {{ $labels.agent_name }} tiene un error rate de
{{ $value | humanizePercentage }} en los últimos 5 minutos.
Revisar logs: kubectl logs -n claude-agents -l app=claude-agent --tail=100
runbook_url: "https://wiki.example.com/runbooks/agent-high-error-rate"
# Costo diario excedido
- alert: AgentDailyCostExceeded
expr: |
increase(agent_cost_usd_total[24h]) > 50
labels:
severity: warning
team: platform
annotations:
summary: "Costo diario del agente excedió $50"
description: |
El agente {{ $labels.agent_name }} ha gastado
${{ $value | humanize }} USD en las últimas 24 horas.
runbook_url: "https://wiki.example.com/runbooks/agent-high-cost"
# Latencia alta
- alert: AgentHighLatency
expr: |
histogram_quantile(
0.99,
rate(agent_query_duration_seconds_bucket[10m])
) > 1800
for: 10m
labels:
severity: warning
annotations:
summary: "Latencia p99 del agente > 30 minutos"
description: |
El p99 de latencia del agente {{ $labels.agent_name }} es
{{ $value | humanizeDuration }}. Podría indicar queries atascados.
# Agente sin actividad (posible fallo silencioso)
- alert: AgentNoActivity
expr: |
absent(rate(agent_queries_total[15m])) == 1
for: 15m
labels:
severity: warning
annotations:
summary: "No se han procesado queries en 15 minutos"
description: "El agente puede estar caído o en estado de error silencioso."
# Queue de trabajos creciendo
- alert: AgentQueueDepthHigh
expr: agent_queue_depth > 100
for: 5m
labels:
severity: warning
annotations:
summary: "Queue del agente tiene más de 100 trabajos pendientes"
Configuración de Notificaciones
# alertmanager.yaml
global:
resolve_timeout: 5m
slack_api_url: "${SLACK_WEBHOOK_URL}"
route:
group_by: ["alertname", "agent_name"]
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
receiver: default
routes:
# Críticos: PagerDuty inmediato
- match:
severity: critical
receiver: pagerduty
continue: true
# Todos los alertas: Slack
- match_re:
severity: warning|critical
receiver: slack
receivers:
- name: default
slack_configs:
- channel: "#alerts-claude-agent"
text: "{{ range .Alerts }}{{ .Annotations.description }}{{ end }}"
- name: slack
slack_configs:
- channel: "#alerts-claude-agent"
title: "{{ .GroupLabels.alertname }}"
text: |
*Severidad:* {{ .GroupLabels.severity }}
*Agente:* {{ .GroupLabels.agent_name }}
{{ range .Alerts }}
*Descripción:* {{ .Annotations.description }}
*Runbook:* {{ .Annotations.runbook_url }}
{{ end }}
color: '{{ if eq .Status "firing" }}danger{{ else }}good{{ end }}'
- name: pagerduty
pagerduty_configs:
- service_key: "${PAGERDUTY_SERVICE_KEY}"
description: "{{ .GroupLabels.alertname }}: {{ .CommonAnnotations.summary }}"
7. Sentry para Error Tracking
Configuración de Sentry
# sentry_setup.py
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.asyncio import AsyncioIntegration
import os
def configure_sentry() -> None:
"""Configura Sentry para captura automática de errores."""
sentry_sdk.init(
dsn=os.getenv("SENTRY_DSN"),
environment=os.getenv("ENVIRONMENT", "production"),
release=os.getenv("APP_VERSION", "unknown"),
traces_sample_rate=0.1, # 10% de transacciones para performance
integrations=[
FastApiIntegration(),
AsyncioIntegration(),
],
# No capturar datos sensibles
before_send=filter_sensitive_data,
)
def filter_sensitive_data(event: dict, hint: dict) -> dict | None:
"""Filtra datos sensibles antes de enviar a Sentry."""
# Remover API keys de los extras
if "extra" in event:
for key in list(event["extra"].keys()):
if any(word in key.lower() for word in ["key", "token", "password", "secret"]):
event["extra"][key] = "[FILTERED]"
return event
async def run_agent_with_sentry(prompt: str) -> str:
"""Agente con contexto de Sentry para mejor error tracking."""
with sentry_sdk.push_scope() as scope:
# Añadir contexto del agente al scope de Sentry
scope.set_tag("agent.name", "production_agent")
scope.set_context("agent_request", {
"prompt_preview": prompt[:200],
"prompt_length": len(prompt),
})
try:
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=["Read", "Bash"],
max_turns=30
)
):
if hasattr(message, 'session_id') and message.session_id:
# Añadir session_id al scope para correlación
scope.set_tag("session.id", message.session_id)
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
except Exception as e:
sentry_sdk.capture_exception(e)
raise
8. Audit Trail
Registro Inmutable de Acciones
# audit.py
import json
import time
import hashlib
from dataclasses import dataclass, asdict
from typing import Any
import asyncpg # PostgreSQL async
@dataclass
class AuditEntry:
"""Entrada del audit trail."""
timestamp: float
entry_id: str
user_id: str | None
session_id: str | None
agent_name: str
action_type: str # "query_started", "tool_called", "query_completed", "query_failed"
action_details: dict
cost_usd: float | None
previous_hash: str # Hash del entry anterior para cadena inmutable
def to_dict(self) -> dict:
return asdict(self)
class AuditTrail:
"""Audit trail append-only con cadena de hashes para integridad."""
def __init__(self, db_pool: asyncpg.Pool):
self.db = db_pool
async def append(
self,
user_id: str | None,
session_id: str | None,
agent_name: str,
action_type: str,
action_details: dict,
cost_usd: float | None = None
) -> AuditEntry:
# Obtener hash del último entry para encadenar
last_hash = await self._get_last_hash()
entry_id = hashlib.sha256(
f"{time.time()}{session_id}{action_type}".encode()
).hexdigest()[:16]
entry = AuditEntry(
timestamp=time.time(),
entry_id=entry_id,
user_id=user_id,
session_id=session_id,
agent_name=agent_name,
action_type=action_type,
action_details=action_details,
cost_usd=cost_usd,
previous_hash=last_hash
)
# Calcular hash de este entry
entry_hash = hashlib.sha256(
json.dumps(entry.to_dict(), sort_keys=True).encode()
).hexdigest()
# Insertar en DB (append-only, no hay UPDATE ni DELETE)
await self.db.execute(
"""
INSERT INTO audit_trail
(timestamp, entry_id, user_id, session_id, agent_name,
action_type, action_details, cost_usd, previous_hash, entry_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
""",
entry.timestamp, entry.entry_id, entry.user_id,
entry.session_id, entry.agent_name, entry.action_type,
json.dumps(entry.action_details), entry.cost_usd,
entry.previous_hash, entry_hash
)
return entry
async def _get_last_hash(self) -> str:
row = await self.db.fetchrow(
"SELECT entry_hash FROM audit_trail ORDER BY timestamp DESC LIMIT 1"
)
return row["entry_hash"] if row else "genesis"
async def query_by_session(self, session_id: str) -> list[dict]:
rows = await self.db.fetch(
"""
SELECT * FROM audit_trail
WHERE session_id = $1
ORDER BY timestamp ASC
""",
session_id
)
return [dict(row) for row in rows]
Schema SQL:
-- schema.sql
CREATE TABLE audit_trail (
id BIGSERIAL,
timestamp DOUBLE PRECISION NOT NULL,
entry_id VARCHAR(16) NOT NULL UNIQUE,
user_id VARCHAR(255),
session_id VARCHAR(255),
agent_name VARCHAR(100) NOT NULL,
action_type VARCHAR(50) NOT NULL,
action_details JSONB NOT NULL DEFAULT '{}',
cost_usd NUMERIC(10, 6),
previous_hash VARCHAR(64) NOT NULL,
entry_hash VARCHAR(64) NOT NULL UNIQUE,
-- No permitir UPDATE ni DELETE via policy
inserted_at TIMESTAMPTZ DEFAULT NOW()
);
-- Índices para consultas comunes
CREATE INDEX idx_audit_session ON audit_trail(session_id);
CREATE INDEX idx_audit_user ON audit_trail(user_id);
CREATE INDEX idx_audit_timestamp ON audit_trail(timestamp);
CREATE INDEX idx_audit_action_type ON audit_trail(action_type);
-- Row Level Security: solo INSERT, no UPDATE ni DELETE
ALTER TABLE audit_trail ENABLE ROW LEVEL SECURITY;
CREATE POLICY audit_insert_only ON audit_trail FOR INSERT TO app_user WITH CHECK (true);
-- Sin políticas para UPDATE/DELETE = denegado por defecto
9. Dashboard de Costos
Tracking de Costos con Python y PostgreSQL
# cost_tracker.py
import asyncio
from datetime import datetime, date
from typing import AsyncGenerator
import asyncpg
from claude_code_sdk import query, ClaudeCodeOptions
async def track_agent_with_costs(
prompt: str,
user_id: str,
agent_name: str,
db_pool: asyncpg.Pool
) -> str:
"""Ejecuta el agente y guarda métricas de costo en PostgreSQL."""
session_id = None
total_cost = 0.0
total_input_tokens = 0
total_output_tokens = 0
results = []
start_time = datetime.utcnow()
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=["Read", "Edit", "Bash"],
max_turns=50
)
):
if hasattr(message, 'session_id') and message.session_id:
session_id = message.session_id
if hasattr(message, 'cost_usd') and message.cost_usd:
total_cost += float(message.cost_usd)
if hasattr(message, 'usage'):
usage = message.usage
if hasattr(usage, 'input_tokens'):
total_input_tokens += usage.input_tokens
if hasattr(usage, 'output_tokens'):
total_output_tokens += usage.output_tokens
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
# Guardar registro de costo
await db_pool.execute(
"""
INSERT INTO agent_cost_log
(session_id, user_id, agent_name, prompt_preview, started_at,
duration_seconds, cost_usd, input_tokens, output_tokens)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
""",
session_id,
user_id,
agent_name,
prompt[:200],
start_time,
(datetime.utcnow() - start_time).total_seconds(),
total_cost,
total_input_tokens,
total_output_tokens
)
return "\n".join(results)
async def get_cost_summary(
db_pool: asyncpg.Pool,
period_days: int = 30
) -> dict:
"""Obtiene resumen de costos por agente y usuario."""
rows = await db_pool.fetch(
"""
SELECT
agent_name,
user_id,
COUNT(*) as query_count,
SUM(cost_usd) as total_cost_usd,
AVG(cost_usd) as avg_cost_usd,
AVG(duration_seconds) as avg_duration_seconds,
SUM(input_tokens) as total_input_tokens,
SUM(output_tokens) as total_output_tokens
FROM agent_cost_log
WHERE started_at > NOW() - INTERVAL '1 day' * $1
GROUP BY agent_name, user_id
ORDER BY total_cost_usd DESC
""",
period_days
)
return {
"period_days": period_days,
"agents": [dict(row) for row in rows],
"total_usd": sum(float(row["total_cost_usd"] or 0) for row in rows)
}
async def project_monthly_cost(
db_pool: asyncpg.Pool
) -> dict:
"""Proyecta el costo mensual basado en los últimos 7 días."""
row = await db_pool.fetchrow(
"""
SELECT
SUM(cost_usd) as last_7_days_cost,
COUNT(*) as last_7_days_queries
FROM agent_cost_log
WHERE started_at > NOW() - INTERVAL '7 days'
"""
)
if not row or not row["last_7_days_cost"]:
return {"projected_monthly_usd": 0, "daily_average_usd": 0}
daily_avg = float(row["last_7_days_cost"]) / 7
projected_monthly = daily_avg * 30
return {
"last_7_days_usd": float(row["last_7_days_cost"]),
"last_7_days_queries": row["last_7_days_queries"],
"daily_average_usd": round(daily_avg, 4),
"projected_monthly_usd": round(projected_monthly, 2),
}
Schema SQL para costos:
-- cost_schema.sql
CREATE TABLE agent_cost_log (
id BIGSERIAL PRIMARY KEY,
session_id VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
agent_name VARCHAR(100) NOT NULL,
prompt_preview TEXT,
started_at TIMESTAMPTZ NOT NULL,
duration_seconds NUMERIC(10, 2),
cost_usd NUMERIC(10, 6) DEFAULT 0,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0
);
-- Índices
CREATE INDEX idx_cost_user ON agent_cost_log(user_id);
CREATE INDEX idx_cost_agent ON agent_cost_log(agent_name);
CREATE INDEX idx_cost_started_at ON agent_cost_log(started_at);
-- Vista para proyecciones
CREATE VIEW daily_cost_summary AS
SELECT
DATE_TRUNC('day', started_at) as day,
agent_name,
COUNT(*) as queries,
SUM(cost_usd) as total_cost_usd,
AVG(cost_usd) as avg_cost_per_query
FROM agent_cost_log
GROUP BY 1, 2
ORDER BY 1 DESC, 4 DESC;
Panel de Grafana para costos con PostgreSQL como datasource:
{
"title": "Proyección de costos mensual",
"type": "stat",
"targets": [
{
"rawSql": "SELECT SUM(cost_usd) * (30.0 / 7.0) as projected_monthly FROM agent_cost_log WHERE started_at > NOW() - INTERVAL '7 days'",
"format": "table"
}
],
"fieldConfig": {
"defaults": {
"unit": "currencyUSD",
"thresholds": {
"steps": [
{ "value": 0, "color": "green" },
{ "value": 50, "color": "yellow" },
{ "value": 100, "color": "red" }
]
}
}
}
}
Resumen del Capítulo
La observabilidad para agentes autónomos requiere los tres pilares coordinados:
flowchart LR
Agent["Agente\nen ejecución"] --> Logs["Structlog/Pino\n→ Loki"]
Agent --> Metrics["prometheus-client\n→ Prometheus → Grafana"]
Agent --> Traces["OpenTelemetry\n→ Jaeger"]
Agent --> Errors["Sentry\n→ Error tracking"]
Agent --> Costs["PostgreSQL\n→ Cost dashboard"]
Logs --> Alerts["AlertManager\n→ Slack/PagerDuty"]
Metrics --> Alerts
Puntos clave:
- Usar
bind_contextvarsde structlog para que cada log incluya automáticamentequery_id,session_id,user_id - Redactar API keys y tokens SIEMPRE antes de loggear
- El costo es el KPI más crítico para agentes — trackearlo por usuario, agente y día
- Las trazas distribuidas son esenciales para debuggear agentes multi-paso
- El audit trail append-only con cadena de hashes garantiza la integridad ante auditorías
Próximo capítulo: Capítulo 19: Patrones Multi-Agente Avanzados — Reflexión, debate, ensemble y sistemas con memoria compartida.
10. Dashboard Grafana Completo
JSON Completo para Importar en Grafana
El siguiente dashboard cubre los cinco paneles críticos para producción: Query Rate, latencia por percentil, costos, heatmap de herramientas y error rate. Importar desde Dashboards → Import → Paste JSON.
{
"title": "Claude Agent — Production Dashboard",
"uid": "claude-agent-prod-v2",
"schemaVersion": 38,
"version": 1,
"tags": ["claude", "agent", "ai", "production"],
"time": { "from": "now-6h", "to": "now" },
"refresh": "30s",
"templating": {
"list": [
{
"name": "agent",
"label": "Agente",
"type": "query",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"query": "label_values(agent_queries_total, agent_name)",
"multi": true,
"includeAll": true,
"current": { "text": "All", "value": "$__all" }
},
{
"name": "model",
"label": "Modelo",
"type": "query",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"query": "label_values(agent_queries_total, model)",
"multi": false,
"includeAll": true
},
{
"name": "user_id",
"label": "Usuario",
"type": "query",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"query": "label_values(agent_queries_total, user_id)",
"multi": true,
"includeAll": true
}
]
},
"panels": [
{
"id": 10,
"title": "Query Rate (queries/min)",
"type": "stat",
"gridPos": { "h": 4, "w": 6, "x": 0, "y": 0 },
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{
"expr": "sum(rate(agent_queries_total{agent_name=~\"$agent\"}[1m])) * 60",
"legendFormat": "queries/min"
}
],
"options": {
"reduceOptions": { "calcs": ["lastNotNull"] },
"colorMode": "background",
"graphMode": "area",
"orientation": "auto",
"thresholds": {
"mode": "absolute",
"steps": [
{ "value": null, "color": "green" },
{ "value": 30, "color": "yellow" },
{ "value": 100, "color": "red" }
]
}
}
},
{
"id": 11,
"title": "Latencia P50 / P95 / P99",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 },
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{
"expr": "histogram_quantile(0.50, sum by (le) (rate(agent_query_duration_seconds_bucket{agent_name=~\"$agent\"}[5m])))",
"legendFormat": "p50"
},
{
"expr": "histogram_quantile(0.95, sum by (le) (rate(agent_query_duration_seconds_bucket{agent_name=~\"$agent\"}[5m])))",
"legendFormat": "p95"
},
{
"expr": "histogram_quantile(0.99, sum by (le) (rate(agent_query_duration_seconds_bucket{agent_name=~\"$agent\"}[5m])))",
"legendFormat": "p99"
}
],
"fieldConfig": {
"defaults": {
"unit": "s",
"custom": {
"lineWidth": 2,
"fillOpacity": 10
},
"thresholds": {
"mode": "absolute",
"steps": [
{ "value": null, "color": "green" },
{ "value": 60, "color": "yellow" },
{ "value": 300, "color": "red" }
]
}
},
"overrides": [
{ "matcher": { "id": "byName", "options": "p99" }, "properties": [{ "id": "color", "value": { "fixedColor": "red", "mode": "fixed" } }] },
{ "matcher": { "id": "byName", "options": "p95" }, "properties": [{ "id": "color", "value": { "fixedColor": "orange", "mode": "fixed" } }] },
{ "matcher": { "id": "byName", "options": "p50" }, "properties": [{ "id": "color", "value": { "fixedColor": "green", "mode": "fixed" } }] }
]
}
},
{
"id": 12,
"title": "Costo USD Over Time",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 },
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{
"expr": "sum by (agent_name) (rate(agent_cost_usd_total{agent_name=~\"$agent\"}[5m])) * 3600",
"legendFormat": "{{ agent_name }} USD/hora"
},
{
"expr": "sum(increase(agent_cost_usd_total{agent_name=~\"$agent\"}[24h]))",
"legendFormat": "Total USD hoy"
}
],
"fieldConfig": {
"defaults": {
"unit": "currencyUSD",
"custom": { "lineWidth": 2, "fillOpacity": 20 }
}
}
},
{
"id": 13,
"title": "Tool Usage Heatmap",
"type": "heatmap",
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 12 },
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{
"expr": "sum by (tool_name) (increase(agent_tool_calls_total{agent_name=~\"$agent\"}[1h]))",
"legendFormat": "{{ tool_name }}"
}
],
"options": {
"calculate": false,
"color": {
"scheme": "Oranges",
"steps": 64
},
"yAxis": { "axisLabel": "Herramienta" },
"tooltip": { "showHistogram": true }
}
},
{
"id": 14,
"title": "Error Rate por Agente",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 12 },
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{
"expr": "sum by (agent_name) (rate(agent_queries_total{status=\"error\", agent_name=~\"$agent\"}[5m])) / sum by (agent_name) (rate(agent_queries_total{agent_name=~\"$agent\"}[5m]))",
"legendFormat": "{{ agent_name }}"
}
],
"fieldConfig": {
"defaults": {
"unit": "percentunit",
"min": 0,
"max": 1,
"thresholds": {
"mode": "absolute",
"steps": [
{ "value": null, "color": "green" },
{ "value": 0.01, "color": "yellow" },
{ "value": 0.05, "color": "red" }
]
}
}
}
},
{
"id": 15,
"title": "Active Queries (Gauge)",
"type": "gauge",
"gridPos": { "h": 4, "w": 6, "x": 6, "y": 0 },
"datasource": { "type": "prometheus", "uid": "prometheus" },
"targets": [
{
"expr": "sum(agent_active_queries{agent_name=~\"$agent\"})",
"legendFormat": "activos"
}
],
"options": {
"minValue": 0,
"maxValue": 50,
"thresholds": {
"mode": "absolute",
"steps": [
{ "value": null, "color": "green" },
{ "value": 20, "color": "yellow" },
{ "value": 40, "color": "red" }
]
}
}
}
],
"annotations": {
"list": [
{
"name": "Deployments",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"expr": "changes(agent_queries_total[5m]) > 0",
"iconColor": "blue",
"hide": false,
"enable": true,
"step": "60s"
}
]
}
}
Provisioning Automático via Docker/Kubernetes
Para que el dashboard se instale automáticamente al levantar Grafana:
# grafana-provisioning/dashboards/provisioner.yaml
apiVersion: 1
providers:
- name: claude-agent-dashboards
orgId: 1
type: file
disableDeletion: false
updateIntervalSeconds: 30
allowUiUpdates: true
options:
path: /var/lib/grafana/dashboards
foldersFromFilesStructure: true
# docker-compose con Grafana completo
services:
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
volumes:
- ./grafana-provisioning:/etc/grafana/provisioning
- ./grafana-dashboards:/var/lib/grafana/dashboards
- grafana_data:/var/lib/grafana
environment:
GF_SECURITY_ADMIN_PASSWORD: "${GRAFANA_PASSWORD:-admin}"
GF_USERS_ALLOW_SIGN_UP: "false"
GF_INSTALL_PLUGINS: "grafana-piechart-panel,grafana-clock-panel"
GF_FEATURE_TOGGLES_ENABLE: "publicDashboards"
depends_on:
- prometheus
prometheus:
image: prom/prometheus:v2.51.0
ports:
- "9090:9090"
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--storage.tsdb.retention.time=30d"
- "--web.enable-lifecycle"
volumes:
grafana_data:
prometheus_data:
# kubernetes/grafana-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-dashboards
namespace: monitoring
data:
claude-agent-dashboard.json: |
{ "title": "Claude Agent — Production Dashboard", ... }
---
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-datasources
namespace: monitoring
data:
datasources.yaml: |
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
url: http://prometheus:9090
isDefault: true
uid: prometheus
Alertas en Grafana (Unified Alerting)
Desde Grafana 9+, las alertas se configuran directamente en el dashboard como JSON:
{
"alert": {
"name": "AgentHighErrorRate",
"message": "Error rate supera 5% en agente {{ $labels.agent_name }}",
"frequency": "1m",
"handler": 1,
"conditions": [
{
"type": "query",
"query": {
"model": {
"conditions": [
{
"evaluator": { "params": [0.05], "type": "gt" },
"operator": { "type": "and" },
"query": { "params": ["error_rate", "5m", "now"] },
"reducer": { "params": [], "type": "avg" }
}
]
}
}
}
],
"notifications": [
{ "uid": "slack-webhook-claude-agents" },
{ "uid": "pagerduty-critical" }
]
}
}
11. Distributed Tracing Avanzado con OpenTelemetry
Instrumentación Completa del Ciclo de Vida del Agente
# otel_advanced.py
"""
Instrumentación completa con OpenTelemetry: spans por herramienta,
llamadas LLM, context propagation entre agentes padre-hijo y baggage.
"""
from opentelemetry import trace, baggage, context
from opentelemetry.sdk.trace import TracerProvider, SpanProcessor
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.trace import StatusCode, SpanKind
import os
import time
import asyncio
from typing import AsyncGenerator
from claude_code_sdk import query, ClaudeCodeOptions
# ============================================================
# Setup del provider con exportadores múltiples
# ============================================================
def setup_otel(service_name: str = "claude-agent") -> trace.Tracer:
"""
Configura OpenTelemetry con:
- Exportador OTLP a Jaeger/Tempo
- Resource con información del servicio
- Propagadores W3C TraceContext + Baggage
"""
resource = Resource.create({
"service.name": service_name,
"service.version": os.getenv("APP_VERSION", "0.0.0"),
"service.instance.id": os.getenv("HOSTNAME", "local"),
"deployment.environment": os.getenv("ENVIRONMENT", "production"),
"telemetry.sdk.language": "python",
})
# OTLP exporter (compatible con Jaeger, Tempo, Honeycomb, Datadog)
otlp_exporter = OTLPSpanExporter(
endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"),
insecure=os.getenv("ENVIRONMENT") != "production"
)
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)
# Configurar propagadores para context propagation
from opentelemetry import propagate
propagate.set_global_textmap(CompositePropagator([
TraceContextTextMapPropagator(),
W3CBaggagePropagator(),
]))
return trace.get_tracer(service_name)
tracer = setup_otel()
# ============================================================
# Baggage: pasar metadatos a través de spans
# ============================================================
def create_agent_context(
session_id: str,
user_id: str,
request_id: str
) -> context.Context:
"""
Crea un contexto con baggage para que todos los spans
hijo hereden session_id, user_id y request_id.
"""
ctx = context.get_current()
ctx = baggage.set_baggage("session.id", session_id, context=ctx)
ctx = baggage.set_baggage("user.id", user_id, context=ctx)
ctx = baggage.set_baggage("request.id", request_id, context=ctx)
return ctx
# ============================================================
# Span por herramienta con input/output
# ============================================================
class ToolSpanManager:
"""Gestiona spans de ciclo de vida de herramientas."""
def __init__(self):
self._active_spans: dict[str, trace.Span] = {}
self._tool_start_times: dict[str, float] = {}
def start_tool(self, tool_use_id: str, tool_name: str, tool_input: dict) -> None:
"""Abre un span cuando se llama una herramienta."""
span = tracer.start_span(
f"tool.{tool_name}",
kind=SpanKind.INTERNAL,
attributes={
"tool.name": tool_name,
"tool.use_id": tool_use_id,
# Solo loggear inputs seguros (no secrets)
"tool.input.type": str(type(tool_input).__name__),
"tool.input.size": len(str(tool_input)),
}
)
self._active_spans[tool_use_id] = span
self._tool_start_times[tool_use_id] = time.monotonic()
def end_tool(
self,
tool_use_id: str,
output: str,
is_error: bool = False
) -> None:
"""Cierra el span de la herramienta con el resultado."""
span = self._active_spans.pop(tool_use_id, None)
if span is None:
return
duration = time.monotonic() - self._tool_start_times.pop(tool_use_id, 0)
span.set_attribute("tool.output.bytes", len(output))
span.set_attribute("tool.duration_seconds", round(duration, 3))
if is_error:
span.set_status(StatusCode.ERROR, output[:200])
else:
span.set_status(StatusCode.OK)
span.end()
def close_all(self) -> None:
"""Cierra todos los spans abiertos (para cleanup en error)."""
for span in self._active_spans.values():
span.set_status(StatusCode.ERROR, "span closed by cleanup")
span.end()
self._active_spans.clear()
self._tool_start_times.clear()
# ============================================================
# Span por llamada LLM con tokens y costo
# ============================================================
async def run_with_full_tracing(
prompt: str,
agent_name: str,
session_id: str,
user_id: str,
parent_context: dict | None = None
) -> str:
"""
Ejecuta el agente con trazas completas:
- Span raíz por query
- Span por cada llamada al LLM (con modelo, tokens)
- Span por cada herramienta (con input size, output size, duración)
- Context propagation con baggage
"""
from opentelemetry import propagate
# Extraer contexto padre si viene de HTTP headers
ctx = propagate.extract(parent_context or {})
# Crear contexto con baggage
agent_ctx = create_agent_context(session_id, user_id, prompt[:20])
# Merge de contextos
token = context.attach(agent_ctx)
tool_manager = ToolSpanManager()
results = []
llm_call_number = 0
try:
with tracer.start_as_current_span(
"agent.query",
context=ctx,
kind=SpanKind.SERVER,
attributes={
"agent.name": agent_name,
"agent.session_id": session_id,
"user.id": user_id,
"prompt.length": len(prompt),
"prompt.preview": prompt[:100],
}
) as root_span:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=["Read", "Edit", "Bash"],
max_turns=50
)
):
msg_type = type(message).__name__
# Span para cada turno del LLM
if msg_type == "AssistantMessage":
llm_call_number += 1
with tracer.start_as_current_span(
"llm.completion",
kind=SpanKind.CLIENT,
attributes={
"llm.model": "claude-opus-4-5",
"llm.call_number": llm_call_number,
"llm.vendor": "anthropic",
}
) as llm_span:
content = str(getattr(message, 'content', ''))
llm_span.set_attribute("llm.response.length", len(content))
# Inicio de tool call
elif msg_type == "ToolUseBlock":
tool_name = getattr(message, 'name', 'unknown')
tool_id = getattr(message, 'id', 'no-id')
tool_input = getattr(message, 'input', {})
tool_manager.start_tool(tool_id, tool_name, tool_input)
# Fin de tool call
elif msg_type == "ToolResultBlock":
tool_id = getattr(message, 'tool_use_id', 'no-id')
output = str(getattr(message, 'content', ''))
is_error = bool(getattr(message, 'is_error', False))
tool_manager.end_tool(tool_id, output, is_error)
# Mensaje de resultado final
elif hasattr(message, 'cost_usd') and message.cost_usd:
cost = float(message.cost_usd)
root_span.set_attribute("agent.cost_usd", cost)
usage = getattr(message, 'usage', None)
if usage:
root_span.set_attribute("llm.input_tokens",
getattr(usage, 'input_tokens', 0))
root_span.set_attribute("llm.output_tokens",
getattr(usage, 'output_tokens', 0))
root_span.set_attribute("llm.cache_read_tokens",
getattr(usage, 'cache_read_input_tokens', 0))
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
root_span.set_attribute("agent.llm_calls", llm_call_number)
root_span.set_status(StatusCode.OK)
except Exception as e:
tool_manager.close_all()
raise
finally:
context.detach(token)
return "\n".join(results)
Context Propagation entre Agentes Padre-Hijo
# otel_propagation.py
"""
Context propagation: el agente padre pasa su trace context al hijo
para que aparezcan en el mismo trace en Jaeger/Tempo.
"""
from opentelemetry import propagate, trace
from opentelemetry.trace import SpanKind
from claude_code_sdk import query, ClaudeCodeOptions
async def parent_agent(task: str) -> str:
"""Agente padre que lanza subagentes."""
with tracer.start_as_current_span(
"parent_agent.run",
kind=SpanKind.PRODUCER,
attributes={"task.description": task[:100]}
) as parent_span:
# Serializar contexto para propagarlo al hijo
carrier: dict[str, str] = {}
propagate.inject(carrier)
# El carrier ahora contiene: {"traceparent": "00-...-...-01"}
# Este header puede pasarse via HTTP, queue message, etc.
# Llamar subagente pasando el carrier
child_result = await child_agent(
prompt=f"Analiza el siguiente task: {task}",
parent_trace_carrier=carrier
)
parent_span.set_attribute("child.result.length", len(child_result))
return child_result
async def child_agent(
prompt: str,
parent_trace_carrier: dict[str, str]
) -> str:
"""Subagente que continúa la traza del padre."""
# Extraer el contexto padre del carrier
parent_ctx = propagate.extract(parent_trace_carrier)
with tracer.start_as_current_span(
"child_agent.run",
context=parent_ctx, # Continuar traza del padre
kind=SpanKind.CONSUMER,
attributes={"prompt.preview": prompt[:100]}
) as child_span:
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=["Read", "Bash"],
max_turns=20
)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
child_span.set_attribute("result.chunks", len(results))
return "\n".join(results)
Exportar a Jaeger, Zipkin y Honeycomb
# otel_exporters.py
"""
Configuración de exportadores para diferentes backends.
El código del agente es idéntico — solo cambia el exporter.
"""
import os
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def get_exporter(backend: str):
"""Retorna el exportador según el backend configurado."""
if backend == "jaeger":
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
return OTLPSpanExporter(
endpoint=os.getenv("JAEGER_ENDPOINT", "http://localhost:4317"),
insecure=True
)
elif backend == "zipkin":
from opentelemetry.exporter.zipkin.json import ZipkinExporter
return ZipkinExporter(
endpoint=os.getenv("ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans")
)
elif backend == "honeycomb":
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
return OTLPSpanExporter(
endpoint="https://api.honeycomb.io:443",
headers={
"x-honeycomb-team": os.environ["HONEYCOMB_API_KEY"],
"x-honeycomb-dataset": os.getenv("HONEYCOMB_DATASET", "claude-agents"),
}
)
elif backend == "tempo":
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
return OTLPSpanExporter(
endpoint=os.getenv("TEMPO_ENDPOINT", "http://localhost:4317"),
insecure=True
)
else:
# Console exporter para desarrollo
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
return ConsoleSpanExporter()
# Seleccionar backend via variable de entorno
OTEL_BACKEND = os.getenv("OTEL_BACKEND", "jaeger")
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(get_exporter(OTEL_BACKEND)))
TypeScript: Instrumentación con OpenTelemetry
// otel-agent.ts
import { trace, SpanStatusCode, SpanKind, context, propagation } from "@opentelemetry/api";
import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-base";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc";
import { Resource } from "@opentelemetry/resources";
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
// Setup del tracer
const resource = new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: "claude-agent-ts",
[SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]: process.env.NODE_ENV ?? "production",
});
const provider = new NodeTracerProvider({ resource });
provider.addSpanProcessor(
new BatchSpanProcessor(
new OTLPTraceExporter({
url: process.env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://localhost:4317",
})
)
);
provider.register();
const tracer = trace.getTracer("claude-agent-ts");
// Agente con instrumentación completa
async function instrumentedAgent(
prompt: string,
agentName: string,
parentCarrier?: Record<string, string>
): Promise<string> {
// Extraer contexto padre si existe
const parentCtx = parentCarrier
? propagation.extract(context.active(), parentCarrier)
: context.active();
return new Promise((resolve, reject) => {
context.with(parentCtx, async () => {
const span = tracer.startSpan("agent.query", {
kind: SpanKind.SERVER,
attributes: {
"agent.name": agentName,
"prompt.length": prompt.length,
"prompt.preview": prompt.slice(0, 100),
},
});
const ctx = trace.setSpan(context.active(), span);
const results: string[] = [];
let llmCalls = 0;
try {
await context.with(ctx, async () => {
for await (const message of query(prompt, {
allowedTools: ["Read", "Edit", "Bash"],
maxTurns: 50,
} as ClaudeCodeOptions)) {
const msgType = (message as { type?: string }).type;
if (msgType === "assistant") {
llmCalls++;
const llmSpan = tracer.startSpan("llm.completion", {
kind: SpanKind.CLIENT,
attributes: {
"llm.model": "claude-opus-4-5",
"llm.call_number": llmCalls,
},
});
llmSpan.setStatus({ code: SpanStatusCode.OK });
llmSpan.end();
}
if (msgType === "tool_use") {
const toolName = (message as { name?: string }).name ?? "unknown";
const toolSpan = tracer.startSpan(`tool.${toolName}`, {
kind: SpanKind.INTERNAL,
attributes: { "tool.name": toolName },
});
toolSpan.setStatus({ code: SpanStatusCode.OK });
toolSpan.end();
}
if ("cost_usd" in message) {
span.setAttribute("agent.cost_usd", Number(message.cost_usd) || 0);
}
if ("content" in message) {
const content = String((message as { content: unknown }).content);
if (content) results.push(content);
}
}
});
span.setAttribute("agent.llm_calls", llmCalls);
span.setStatus({ code: SpanStatusCode.OK });
resolve(results.join("\n"));
} catch (err) {
span.setStatus({ code: SpanStatusCode.ERROR, message: String(err) });
span.recordException(err as Error);
reject(err);
} finally {
span.end();
}
});
});
}
export { instrumentedAgent, tracer };
12. Log Correlation y Búsqueda
Correlacionar Logs de Múltiples Servicios via trace_id
Cuando tienes múltiples servicios (API gateway, agente, worker), todos los logs de una misma request deben tener el mismo trace_id. Así puedes buscar todos los logs de una sesión en Loki/Elasticsearch.
# log_correlation.py
"""
Correlación de logs con trace_id de OpenTelemetry.
Todos los logs dentro de un span incluyen automáticamente trace_id y span_id.
"""
import structlog
from opentelemetry import trace
from opentelemetry.trace import format_trace_id, format_span_id
def add_otel_context(logger, method, event_dict):
"""
Processor de structlog que agrega trace_id y span_id al log
desde el span activo de OpenTelemetry.
"""
span = trace.get_current_span()
if span and span.is_recording():
ctx = span.get_span_context()
event_dict["trace_id"] = format_trace_id(ctx.trace_id)
event_dict["span_id"] = format_span_id(ctx.span_id)
return event_dict
def configure_correlated_logging():
"""
Configura structlog con correlación automática de traces.
Cada log tendrá: trace_id, span_id, timestamp, level.
"""
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_log_level,
structlog.contextvars.merge_contextvars,
add_otel_context, # <-- agrega trace_id, span_id
structlog.processors.JSONRenderer(),
],
logger_factory=structlog.PrintLoggerFactory(),
)
# Log ejemplo con correlación:
# {
# "timestamp": "2026-03-23T10:15:30.123Z",
# "level": "info",
# "event": "tool_called",
# "tool": "Read",
# "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
# "span_id": "00f067aa0ba902b7",
# "session_id": "abc123",
# "query_id": "d4e5f6"
# }
Loki + Grafana para Búsqueda de Logs
# docker-compose con stack completo de logs
services:
loki:
image: grafana/loki:2.9.0
ports:
- "3100:3100"
volumes:
- ./loki-config.yaml:/etc/loki/local-config.yaml
- loki_data:/loki
command: -config.file=/etc/loki/local-config.yaml
promtail:
image: grafana/promtail:2.9.0
volumes:
- /var/log:/var/log
- ./promtail-config.yaml:/etc/promtail/config.yml
- /var/lib/docker/containers:/var/lib/docker/containers:ro
command: -config.file=/etc/promtail/config.yml
grafana:
image: grafana/grafana:10.4.0
environment:
GF_DATASOURCES_DEFAULT_TYPE: loki
volumes:
- ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/loki.yaml
# promtail-config.yaml: scrapear logs JSON del agente
scrape_configs:
- job_name: claude-agent
static_configs:
- targets: ["localhost"]
labels:
job: claude-agent
__path__: /app/logs/agent*.log
pipeline_stages:
- json:
expressions:
trace_id: trace_id
session_id: session_id
level: level
event: event
agent_name: agent_name
- labels:
trace_id:
session_id:
level:
agent_name:
Queries Útiles en Loki (LogQL)
# Todos los logs de una sesión específica
{job="claude-agent"} |= `"session_id":"abc123"`
# Logs de error en los últimos 30 minutos
{job="claude-agent"} |= `"level":"error"` | json | line_format "{{.timestamp}} {{.event}} {{.error}}"
# Todos los logs de un trace_id (ver request completa)
{job="claude-agent"} |= `"trace_id":"4bf92f3577b34da6a"` | json | line_format "{{.span_id}} {{.event}}"
# Tool calls lentas (output_bytes > 10000)
{job="claude-agent"} |= `"event":"tool_completed"` | json | output_bytes > 10000
# Contar errores por tipo en la última hora
sum by (error_type) (
count_over_time({job="claude-agent"} |= `"level":"error"` | json | unwrap error_type [1h])
)
# Latencia promedio por agente (usando duration_seconds)
avg by (agent_name) (
{job="claude-agent"} |= `"event":"query_completed"` | json | unwrap duration_seconds [5m]
)
Elasticsearch/OpenSearch para Logs de Agentes
# elasticsearch_logger.py
"""
Enviar logs directamente a Elasticsearch para búsqueda full-text.
Útil para logs de razonamiento del agente (texto no estructurado).
"""
import asyncio
import json
from elasticsearch import AsyncElasticsearch
import structlog
from structlog.contextvars import merge_contextvars
es_client = AsyncElasticsearch(
hosts=[{"host": "localhost", "port": 9200}]
)
INDEX_NAME = "claude-agent-logs"
async def index_agent_log(log_entry: dict) -> None:
"""Indexa un log en Elasticsearch."""
await es_client.index(
index=f"{INDEX_NAME}-{log_entry.get('date', 'unknown')}",
document=log_entry
)
async def search_session_logs(session_id: str) -> list[dict]:
"""Busca todos los logs de una sesión."""
result = await es_client.search(
index=f"{INDEX_NAME}-*",
body={
"query": {
"term": {"session_id.keyword": session_id}
},
"sort": [{"@timestamp": {"order": "asc"}}],
"size": 1000
}
)
return [hit["_source"] for hit in result["hits"]["hits"]]
async def find_expensive_sessions(
min_cost_usd: float = 0.10,
last_hours: int = 24
) -> list[dict]:
"""
Encuentra sesiones que costaron más de min_cost_usd
en las últimas last_hours horas.
"""
result = await es_client.search(
index=f"{INDEX_NAME}-*",
body={
"query": {
"bool": {
"must": [
{"term": {"event": "query_completed"}},
{"range": {
"@timestamp": {"gte": f"now-{last_hours}h"}
}},
{"range": {"cost_usd": {"gte": min_cost_usd}}}
]
}
},
"aggs": {
"by_session": {
"terms": {"field": "session_id.keyword"},
"aggs": {
"total_cost": {"sum": {"field": "cost_usd"}}
}
}
}
}
)
return result["aggregations"]["by_session"]["buckets"]
13. SLOs y Error Budgets para Agentes
Definir SLOs para Agentes Autónomos
Los SLOs (Service Level Objectives) para agentes son más complejos que para APIs porque las variables son más ricas: latencia variable, costo por query, tasa de éxito y calidad del resultado.
# slo_definitions.py
"""
Definición de SLOs para agentes de IA.
Estructura: SLO = indicador + objetivo + ventana de tiempo.
"""
from dataclasses import dataclass
from enum import Enum
class SLOStatus(str, Enum):
HEALTHY = "healthy"
WARNING = "warning"
BURNING = "burning" # Burn rate alto: agotando el budget
EXHAUSTED = "exhausted" # Budget agotado
@dataclass
class AgentSLO:
name: str
description: str
objective: float # 0.0 a 1.0 (ej: 0.99 = 99%)
window_days: int # Ventana de evaluación en días
metric_query: str # Query Prometheus para el indicador
# SLOs del agente de producción
AGENT_SLOS = [
AgentSLO(
name="availability",
description="El agente completa el 99% de los queries sin error",
objective=0.99,
window_days=30,
metric_query="""
sum(rate(agent_queries_total{status="success"}[30d]))
/
sum(rate(agent_queries_total[30d]))
"""
),
AgentSLO(
name="latency_p95",
description="El 95% de los queries completan en menos de 60 segundos",
objective=0.95,
window_days=30,
metric_query="""
histogram_quantile(0.95,
rate(agent_query_duration_seconds_bucket[30d])
) < 60
"""
),
AgentSLO(
name="cost_per_query",
description="El costo promedio por query es menor a $0.10",
objective=0.95,
window_days=7,
metric_query="""
avg(rate(agent_cost_usd_total[7d]))
/
avg(rate(agent_queries_total[7d]))
< 0.10
"""
),
]
Cálculo del Error Budget
# error_budget.py
"""
Error budget: cuánto "error" puedes incurrir en el periodo de la ventana.
Para un SLO de 99% en 30 días:
- Total de minutos: 30 * 24 * 60 = 43,200 minutos
- Error budget: (1 - 0.99) * 43,200 = 432 minutos de downtime permitidos
"""
import asyncio
from typing import NamedTuple
from prometheus_client import CollectorRegistry, Gauge
class ErrorBudgetStatus(NamedTuple):
slo_name: str
objective: float
current_sli: float # Valor actual del indicador
error_budget_total: float # Budget total en el periodo
error_budget_remaining: float # Budget restante
burn_rate: float # Cuán rápido se consume el budget
status: SLOStatus
def calculate_error_budget(
objective: float,
current_sli: float,
window_minutes: int
) -> ErrorBudgetStatus:
"""
Calcula el estado del error budget.
Args:
objective: SLO objetivo (ej: 0.99)
current_sli: Valor actual del indicador (ej: 0.987)
window_minutes: Duración de la ventana en minutos
"""
allowed_failure_rate = 1.0 - objective
actual_failure_rate = 1.0 - current_sli
total_budget = allowed_failure_rate * window_minutes
consumed = actual_failure_rate * window_minutes
remaining = total_budget - consumed
burn_rate = actual_failure_rate / allowed_failure_rate if allowed_failure_rate > 0 else 0
if burn_rate < 1.0:
status = SLOStatus.HEALTHY
elif burn_rate < 2.0:
status = SLOStatus.WARNING
elif burn_rate < 5.0:
status = SLOStatus.BURNING
else:
status = SLOStatus.EXHAUSTED
return ErrorBudgetStatus(
slo_name="availability",
objective=objective,
current_sli=current_sli,
error_budget_total=total_budget,
error_budget_remaining=remaining,
burn_rate=burn_rate,
status=status,
)
# Ejemplo:
# SLO 99%, ventana 30 días (43,200 min), SLI actual 98.7%
# budget = 0.01 * 43200 = 432 min
# consumed = 0.013 * 43200 = 561 min --> budget agotado!
Alertas Basadas en Burn Rate
Las alertas de burn rate son más precisas que alertas simples de threshold porque detectan tendencias antes de que el budget se agote.
# slo-alerts.yaml
groups:
- name: slo-error-budget
rules:
# Alerta 1: burn rate alto en 1 hora (consume el budget en ~5 días)
- alert: AgentSLOBurnRateHigh
expr: |
(
rate(agent_queries_total{status="error"}[1h])
/
rate(agent_queries_total[1h])
) > 14.4 * 0.01
for: 5m
labels:
severity: critical
slo: availability
annotations:
summary: "Burn rate del SLO de disponibilidad es 14.4x (consumirá el budget en 5 días)"
description: |
El error rate actual agotará el error budget mensual en 5 días si continúa.
Error rate actual: {{ $value | humanizePercentage }}
Error rate objetivo: 1%
# Alerta 2: burn rate moderado en 6 horas (consume en 2 semanas)
- alert: AgentSLOBurnRateModerate
expr: |
(
rate(agent_queries_total{status="error"}[6h])
/
rate(agent_queries_total[6h])
) > 3 * 0.01
for: 15m
labels:
severity: warning
slo: availability
annotations:
summary: "Burn rate del SLO es 3x — revisar antes de que sea crítico"
# Alerta 3: menos del 10% del budget restante
- alert: AgentErrorBudgetAlmostExhausted
expr: |
1 - (
sum_over_time(rate(agent_queries_total{status="success"}[30d]))
/
sum_over_time(rate(agent_queries_total[30d]))
) > 0.009
for: 1h
labels:
severity: critical
annotations:
summary: "Menos del 10% del error budget restante para el mes"
Dashboard de SLO Compliance
{
"title": "SLO Compliance — Claude Agent",
"panels": [
{
"title": "Disponibilidad (SLO: 99%)",
"type": "stat",
"targets": [
{
"expr": "sum(rate(agent_queries_total{status='success'}[30d])) / sum(rate(agent_queries_total[30d]))",
"legendFormat": "Disponibilidad"
}
],
"options": {
"thresholds": {
"steps": [
{ "value": null, "color": "red" },
{ "value": 0.99, "color": "green" }
]
}
},
"fieldConfig": { "defaults": { "unit": "percentunit", "min": 0.98, "max": 1 } }
},
{
"title": "Error Budget Remaining (30d)",
"type": "gauge",
"targets": [
{
"expr": "1 - (sum(rate(agent_queries_total{status='error'}[30d])) / sum(rate(agent_queries_total[30d]))) / 0.01",
"legendFormat": "Budget restante"
}
],
"options": {
"minValue": 0,
"maxValue": 1,
"thresholds": {
"steps": [
{ "value": null, "color": "red" },
{ "value": 0.1, "color": "orange" },
{ "value": 0.5, "color": "yellow" },
{ "value": 0.8, "color": "green" }
]
}
},
"fieldConfig": { "defaults": { "unit": "percentunit" } }
}
]
}
14. Chaos Engineering para Agentes
Probar Resiliencia Inyectando Fallos
El chaos engineering verifica que los mecanismos de resiliencia (retry, circuit breaker, fallback) realmente funcionan antes de que ocurra un fallo real en producción.
# chaos_tools.py
"""
Chaos engineering para agentes: herramientas que fallan intencionalmente
para verificar que el sistema se recupera correctamente.
"""
import random
import asyncio
import time
from functools import wraps
from typing import Callable, Any
class ChaosConfig:
"""Configuración del motor de chaos."""
def __init__(
self,
failure_rate: float = 0.20, # 20% de probabilidad de fallo
latency_injection_rate: float = 0.15, # 15% de probabilidad de latencia
extra_latency_seconds: float = 10.0, # Latencia extra inyectada
enabled: bool = True
):
self.failure_rate = failure_rate
self.latency_injection_rate = latency_injection_rate
self.extra_latency_seconds = extra_latency_seconds
self.enabled = enabled
class ChaosMonkey:
"""
Monkey patching de tools para inyectar fallos.
Solo para testing — NUNCA en producción.
"""
def __init__(self, config: ChaosConfig):
self.config = config
self._injected_failures: list[dict] = []
self._injected_latencies: list[dict] = []
def chaotic_tool(self, tool_name: str, original_func: Callable) -> Callable:
"""
Envuelve una función de tool con inyección de fallos.
"""
config = self.config
monkey = self
@wraps(original_func)
async def wrapper(*args, **kwargs):
if not config.enabled:
return await original_func(*args, **kwargs)
# Inyectar latencia
if random.random() < config.latency_injection_rate:
latency = random.uniform(1.0, config.extra_latency_seconds)
monkey._injected_latencies.append({
"tool": tool_name,
"latency_seconds": round(latency, 2),
"timestamp": time.time()
})
print(f"[CHAOS] Inyectando {latency:.1f}s de latencia en {tool_name}")
await asyncio.sleep(latency)
# Inyectar fallo
if random.random() < config.failure_rate:
error_msg = f"[CHAOS] Fallo inyectado en {tool_name}"
monkey._injected_failures.append({
"tool": tool_name,
"error": error_msg,
"timestamp": time.time()
})
print(error_msg)
raise RuntimeError(error_msg)
return await original_func(*args, **kwargs)
return wrapper
@property
def failure_summary(self) -> dict:
return {
"total_failures": len(self._injected_failures),
"total_latencies": len(self._injected_latencies),
"failures_by_tool": {
f["tool"]: sum(1 for x in self._injected_failures if x["tool"] == f["tool"])
for f in self._injected_failures
}
}
Chaos Hook — Herramienta que Falla Aleatoriamente
# chaos_hook.py
"""
Hook de PreToolUse que inyecta fallos en herramientas específicas.
Se integra con el SDK via el sistema de hooks.
"""
import random
import json
import os
def create_chaos_pre_tool_hook(
tools_to_chaos: list[str],
failure_rate: float = 0.20,
only_in_environments: list[str] | None = None
) -> str:
"""
Genera el script de hook que inyecta fallos.
Retorna el path al script generado.
"""
env_check = ""
if only_in_environments:
envs = json.dumps(only_in_environments)
env_check = f"""
import os
allowed_envs = {envs}
current_env = os.getenv("ENVIRONMENT", "production")
if current_env not in allowed_envs:
import sys; sys.exit(0) # No aplicar chaos en producción
"""
script_content = f"""#!/usr/bin/env python3
\"\"\"
Chaos Engineering Hook — PreToolUse
SOLO para testing — NO usar en producción
\"\"\"
import sys
import json
import random
{env_check}
hook_input = json.loads(sys.stdin.read())
tool_name = hook_input.get("tool_name", "")
chaos_tools = {json.dumps(tools_to_chaos)}
failure_rate = {failure_rate}
if tool_name in chaos_tools and random.random() < failure_rate:
# Devolver error al agente (el agente deberá manejar el fallo)
output = {{
"decision": "block",
"reason": f"[CHAOS] Fallo inyectado en herramienta {{tool_name}} para prueba de resiliencia"
}}
print(json.dumps(output))
sys.exit(0)
# Si no hay fallo, dejar pasar
print(json.dumps({{"decision": "approve"}}))
"""
hook_path = "/tmp/chaos_pre_tool_hook.py"
with open(hook_path, "w") as f:
f.write(script_content)
os.chmod(hook_path, 0o755)
return hook_path
Suite de Tests de Chaos
# chaos_test_suite.py
"""
Suite completa de chaos engineering para el agente.
Verifica que los mecanismos de resiliencia funcionan correctamente.
"""
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
from claude_code_sdk import query, ClaudeCodeOptions
class ChaosTestSuite:
"""Suite de tests que verifica resiliencia bajo condiciones caóticas."""
async def test_retry_on_transient_failure(self) -> bool:
"""
Verifica que el agente reintenta correctamente cuando una herramienta
falla de forma transitoria.
"""
call_count = 0
original_results = []
# Simular fallo en el primer intento, éxito en el segundo
async def failing_then_succeeding_query(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("Simulated transient failure")
# Mock de un generador async
async def mock_messages():
yield type("Result", (), {"content": "Success after retry", "cost_usd": 0.01})()
return mock_messages()
try:
# El agente con retry debería manejar esto
from your_agent_module import agent_with_retry # noqa
result = await agent_with_retry(
prompt="test task",
tools=["Read"],
max_retries=3,
base_delay=0.1 # Delay corto para tests
)
assert result is not None
print("✓ Retry test passed")
return True
except Exception as e:
print(f"✗ Retry test failed: {e}")
return False
async def test_circuit_breaker_opens_on_failures(self) -> bool:
"""
Verifica que el circuit breaker se abre después de N fallos consecutivos.
"""
from your_agent_module import AgentCircuitBreaker # noqa
breaker = AgentCircuitBreaker(
failure_threshold=3,
reset_timeout=1.0
)
failures = 0
# Provocar 3 fallos consecutivos
for i in range(3):
try:
# Simular fallo forzando una excepción directa
breaker._on_failure()
failures += 1
except Exception:
pass
from your_agent_module import CircuitState # noqa
assert breaker.state == CircuitState.OPEN, f"Expected OPEN, got {breaker.state}"
# Intentar llamar con circuito abierto — debe fallar inmediatamente
try:
await breaker.call("test", ["Read"])
print("✗ Circuit breaker should have rejected the call")
return False
except RuntimeError as e:
if "ABIERTO" in str(e) or "OPEN" in str(e):
print("✓ Circuit breaker test passed")
return True
raise
async def test_cost_budget_enforced(self) -> bool:
"""
Verifica que el agente se detiene cuando se excede el presupuesto de costo.
"""
# El CostBudgetPlugin debería lanzar un error
print("✓ Cost budget test requires integration with PluggableAgentRunner")
return True
async def run_all(self) -> dict[str, bool]:
"""Ejecuta toda la suite y retorna resultados."""
results = {}
tests = [
("retry_on_transient_failure", self.test_retry_on_transient_failure),
("circuit_breaker_opens", self.test_circuit_breaker_opens_on_failures),
("cost_budget_enforced", self.test_cost_budget_enforced),
]
for test_name, test_func in tests:
print(f"\nEjecutando: {test_name}")
try:
result = await test_func()
results[test_name] = result
except Exception as e:
print(f"✗ {test_name} raised exception: {e}")
results[test_name] = False
passed = sum(1 for r in results.values() if r)
total = len(results)
print(f"\n{'='*40}")
print(f"Resultado: {passed}/{total} tests pasaron")
return results
# Ejecutar la suite
if __name__ == "__main__":
suite = ChaosTestSuite()
asyncio.run(suite.run_all())
TypeScript: Chaos Testing
// chaos-test.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
interface ChaosConfig {
failureRate: number;
latencyMs: number;
latencyRate: number;
}
function wrapWithChaos<T extends (...args: unknown[]) => Promise<unknown>>(
fn: T,
config: ChaosConfig
): T {
return (async (...args: unknown[]) => {
// Inyectar latencia
if (Math.random() < config.latencyRate) {
const latency = Math.random() * config.latencyMs;
console.log(`[CHAOS] Injecting ${latency.toFixed(0)}ms latency`);
await new Promise((r) => setTimeout(r, latency));
}
// Inyectar fallo
if (Math.random() < config.failureRate) {
console.log("[CHAOS] Injecting failure");
throw new Error("[CHAOS] Injected failure for resilience testing");
}
return fn(...args);
}) as T;
}
async function runChaosTest(): Promise<void> {
const config: ChaosConfig = {
failureRate: 0.3, // 30% de fallos
latencyMs: 5000, // hasta 5 segundos extra
latencyRate: 0.2, // 20% con latencia extra
};
let attempts = 0;
let successes = 0;
const totalRuns = 10;
for (let i = 0; i < totalRuns; i++) {
attempts++;
try {
// Simulación: envolver query con chaos
const results: string[] = [];
for await (const message of query("Di hola brevemente", {
allowedTools: [],
maxTurns: 1,
} as ClaudeCodeOptions)) {
if ("content" in message) {
results.push(String((message as { content: unknown }).content));
}
}
successes++;
} catch (e) {
console.log(`[CHAOS] Run ${i + 1} failed: ${e}`);
}
}
const successRate = successes / attempts;
console.log(`\nChaos test results:`);
console.log(` Attempts: ${attempts}`);
console.log(` Successes: ${successes}`);
console.log(` Success rate: ${(successRate * 100).toFixed(1)}%`);
}
15. Profiling y Flame Graphs
Dónde Pasa el Tiempo el Agente
flowchart LR
subgraph TimeBreakdown["Distribución típica del tiempo de un query"]
LLM["Esperando respuesta LLM\n~60-75% del tiempo total"]
Tools["Ejecutando herramientas\n~15-25% del tiempo total"]
SDK["SDK overhead\n(parse JSON, Python)\n~2-5% del tiempo total"]
Network["Red y latencia\n~5-10% del tiempo total"]
end
Profiling con cProfile y py-spy
# profiling_agent.py
"""
Profiling de performance del código Python que usa el SDK.
Identifica qué parte del código (no LLM) consume más tiempo.
"""
import cProfile
import pstats
import asyncio
import io
from claude_code_sdk import query, ClaudeCodeOptions
async def agent_workload():
"""La carga de trabajo real del agente."""
results = []
async for message in query(
"Lista los archivos en el directorio actual",
options=ClaudeCodeOptions(
allowed_tools=["Bash"],
max_turns=5
)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
# Simular procesamiento post-mensaje
_process_message(message)
return results
def _process_message(message) -> None:
"""Procesamiento que podría ser costoso si hay muchos mensajes."""
import json
# Serializar para auditoría (potencial bottleneck si el mensaje es grande)
try:
_ = json.dumps(str(message))
except Exception:
pass
def profile_agent_sync():
"""
Profile síncrono del agente.
cProfile funciona mejor con código síncrono.
"""
profiler = cProfile.Profile()
# Wrapper síncrono
def run_sync():
loop = asyncio.new_event_loop()
return loop.run_until_complete(agent_workload())
profiler.enable()
result = run_sync()
profiler.disable()
# Generar reporte
stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats("cumulative")
stats.print_stats(30) # Top 30 funciones más lentas
print("\n=== PROFILING REPORT ===")
print(stream.getvalue())
return result
# Ejecutar profiling
if __name__ == "__main__":
profile_agent_sync()
py-spy para Profiling de Procesos en Ejecución
# Instalar py-spy
pip install py-spy
# Profiling de un proceso en ejecución (PID)
# Útil para procesos que ya están corriendo en producción
py-spy record -o agent_profile.svg --pid $(pgrep -f "python agent.py") --duration 30
# Profiling desde el inicio del proceso
py-spy record -o agent_profile.svg -- python agent.py
# Top en tiempo real (como `top` pero para funciones Python)
py-spy top --pid $(pgrep -f "python agent.py")
# Dump del stack trace actual (para ver dónde está atascado)
py-spy dump --pid $(pgrep -f "python agent.py")
Interpretación del Flame Graph
flowchart TD
subgraph FlameGraph["Lectura de un Flame Graph (SVG)"]
direction TB
Base["Base del gráfico = llamadas más externas\n(main, asyncio.run, etc.)"]
Middle["Capas intermedias = chain de llamadas"]
Top["Topes de llamas = funciones que consumen más CPU"]
Base --> Middle
Middle --> Top
end
subgraph Interpretation["¿Qué buscar?"]
Wide["Llamas anchas = función lenta o llamada muchas veces"]
Flat["Llama plana y ancha = bottleneck real"]
Tall["Llama alta y delgada = deep call stack, no necesariamente lento"]
end
Profiling Asíncrono con austin
# austin: profiler de bajo overhead para Python async
pip install austin-dist
# Profiling de código async (asyncio)
austin -i 1000 python agent.py | austin-web
# Alternatively, usar asyncio debug mode para encontrar coroutines lentas
PYTHONASYNCIODEBUG=1 python agent.py 2>&1 | grep "took longer than"
Identificar Bottlenecks en Procesamiento de Mensajes
# message_processing_profiler.py
"""
Profiler específico para el procesamiento de mensajes del SDK.
Identifica si el cuello de botella está en el procesamiento Python
o en la espera del LLM.
"""
import time
import asyncio
from collections import defaultdict
from claude_code_sdk import query, ClaudeCodeOptions
class MessageTimingProfiler:
"""Mide cuánto tiempo pasa en cada tipo de mensaje."""
def __init__(self):
self.timings: dict[str, list[float]] = defaultdict(list)
self.between_messages: list[float] = []
self._last_message_time: float | None = None
async def profile(self, prompt: str) -> dict:
"""Perfila el procesamiento de mensajes."""
start = time.monotonic()
message_count = 0
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=["Read", "Bash"],
max_turns=20
)
):
now = time.monotonic()
message_count += 1
# Tiempo entre mensajes = tiempo esperando al LLM/tool
if self._last_message_time is not None:
wait_time = now - self._last_message_time
self.between_messages.append(wait_time)
# Tiempo de procesamiento Python por tipo de mensaje
msg_type = type(message).__name__
proc_start = time.monotonic()
# Simular procesamiento
_ = str(message)
proc_time = time.monotonic() - proc_start
self.timings[msg_type].append(proc_time)
self._last_message_time = time.monotonic()
total = time.monotonic() - start
python_time = sum(sum(v) for v in self.timings.values())
wait_time = sum(self.between_messages)
return {
"total_seconds": round(total, 3),
"python_processing_seconds": round(python_time, 3),
"waiting_for_llm_or_tools_seconds": round(wait_time, 3),
"python_overhead_percent": round(python_time / total * 100, 1),
"message_count": message_count,
"timing_by_message_type": {
k: {
"count": len(v),
"avg_ms": round(sum(v) / len(v) * 1000, 3),
"max_ms": round(max(v) * 1000, 3),
}
for k, v in self.timings.items()
}
}
async def main():
profiler = MessageTimingProfiler()
report = await profiler.profile("Lista los archivos Python del directorio actual")
print("\n=== AGENT MESSAGE PROCESSING PROFILE ===")
print(f"Tiempo total: {report['total_seconds']}s")
print(f"Tiempo Python: {report['python_processing_seconds']}s ({report['python_overhead_percent']}%)")
print(f"Esperando LLM/tools: {report['waiting_for_llm_or_tools_seconds']}s")
print(f"Total mensajes: {report['message_count']}")
print("\nPor tipo de mensaje:")
for msg_type, timing in report['timing_by_message_type'].items():
print(f" {msg_type}: {timing['count']} msgs, avg={timing['avg_ms']}ms, max={timing['max_ms']}ms")
asyncio.run(main())