Capítulo 18: Monitoreo y Observabilidad

Por: Artiko
claudeagent-sdkobservabilidadloggingmetricasopentelemetry

Capítulo 18: Monitoreo y Observabilidad

Un agente autónomo que falla silenciosamente es indetectable sin buena observabilidad. A diferencia de los servicios web donde un error es inmediato, un agente puede ejecutar 20 herramientas, consumir $0.50 en tokens y producir un resultado incorrecto sin que ningún sistema de monitoreo tradicional lo detecte. Este capítulo construye el sistema completo de observabilidad para agentes.


1. Los 3 Pilares: Logs, Métricas, Trazas

Aplicados a Agentes de IA

flowchart TB
    subgraph Pillars["Los 3 Pilares de Observabilidad"]
        direction LR

        subgraph Logs["Logs (¿Qué pasó?)"]
            L1["Query recibido\nsession_id: abc"]
            L2["Tool llamado: Bash\ninput: ls -la"]
            L3["Tool resultado: OK\noutput: 3 archivos"]
            L4["Query completado\ncosto: $0.05"]
        end

        subgraph Metrics["Métricas (¿Cuánto/cuándo?)"]
            M1["queries_total: 1547"]
            M2["latencia_p99: 45s"]
            M3["costo_usd_total: $23.4"]
            M4["errores_rate: 1.2%"]
        end

        subgraph Traces["Trazas (¿Por qué tardó?)"]
            T1["Span: query total\n45.2s"]
            T2["Span: LLM call 1\n3.1s"]
            T3["Span: Bash tool\n0.5s"]
            T4["Span: LLM call 2\n2.8s"]
        end
    end

    subgraph Stack["Stack Recomendado"]
        S1["Structlog / Pino"]
        S2["Prometheus + Grafana"]
        S3["OpenTelemetry + Jaeger"]
    end

    Logs --> S1
    Metrics --> S2
    Traces --> S3

Por Qué la Observabilidad es Crítica para Agentes Autónomos

Los agentes autónomos tienen características que hacen la observabilidad especialmente crítica:

  1. Duración larga: Una sesión de 30 minutos puede fallar en el minuto 28, perdiendo todo el trabajo y el costo incurrido.
  2. Costo variable: Un bug de prompt puede hacer que el agente llame herramientas en loop infinito, generando un costo de $50 en minutos.
  3. No-determinismo: El mismo prompt puede producir resultados diferentes. Sin logs del razonamiento, es imposible debuggear.
  4. Efectos secundarios reales: El agente puede modificar archivos, ejecutar código, enviar emails. Los errores tienen consecuencias reales.
  5. Multi-paso: Un error en el paso 15 de 20 puede ser causado por una decisión incorrecta en el paso 3. Las trazas distribuidas son la única forma de entender esto.

Stack Recomendado

PilarTool PythonTool TypeScriptBackend
LogsstructlogpinoLoki + Grafana
Métricasprometheus-clientprom-clientPrometheus + Grafana
Trazasopentelemetry-sdk@opentelemetry/sdk-nodeJaeger / Tempo
Erroressentry-sdk@sentry/nodeSentry Cloud
CostosCustom counterCustom counterGrafana + PostgreSQL

2. Logging Estructurado

Python con structlog

# logging_setup.py
import logging
import sys
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars


def configure_logging(level: str = "INFO", json_output: bool = True) -> None:
    """
    Configura structlog para logging estructurado.

    En producción usa json_output=True para que el log aggregator
    (Loki, CloudWatch, etc.) pueda parsear y filtrar campos.
    """
    processors = [
        # Añadir timestamp ISO 8601
        structlog.processors.TimeStamper(fmt="iso"),
        # Añadir nivel como string
        structlog.stdlib.add_log_level,
        # Añadir nombre del logger
        structlog.stdlib.add_logger_name,
        # Añadir contexto de variables de contexto
        structlog.contextvars.merge_contextvars,
        # Añadir stack trace en excepciones
        structlog.processors.StackInfoRenderer(),
        structlog.processors.ExceptionRenderer(),
    ]

    if json_output:
        processors.append(structlog.processors.JSONRenderer())
    else:
        # Para desarrollo: output legible
        processors.append(structlog.dev.ConsoleRenderer())

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.make_filtering_bound_logger(
            logging.getLevelName(level)
        ),
        logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
        cache_logger_on_first_use=True,
    )

Uso con contexto automático por query:

# agent_logger.py
import structlog
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import uuid
import time

logger = structlog.get_logger()


@asynccontextmanager
async def agent_logging_context(
    session_id: str | None = None,
    user_id: str | None = None,
    agent_name: str = "default"
) -> AsyncGenerator[structlog.BoundLogger, None]:
    """
    Context manager que añade contexto automáticamente a todos los logs
    dentro del bloque.
    """
    from structlog.contextvars import bind_contextvars, clear_contextvars

    query_id = str(uuid.uuid4())[:8]
    start_time = time.monotonic()

    # Vincular contexto: todos los logs dentro del bloque incluirán estos campos
    bind_contextvars(
        query_id=query_id,
        session_id=session_id,
        user_id=user_id,
        agent_name=agent_name
    )

    log = logger.bind(query_id=query_id)

    try:
        log.info("query_started", agent=agent_name)
        yield log
        duration = time.monotonic() - start_time
        log.info("query_completed", duration_seconds=round(duration, 2))
    except Exception as e:
        duration = time.monotonic() - start_time
        log.error(
            "query_failed",
            error=str(e),
            error_type=type(e).__name__,
            duration_seconds=round(duration, 2)
        )
        raise
    finally:
        clear_contextvars()

Logging de cada herramienta usada:

# tool_logger.py
import time
import structlog
from claude_code_sdk import query, ClaudeCodeOptions

logger = structlog.get_logger()

def _redact_sensitive(text: str) -> str:
    """Redacta datos sensibles de logs."""
    import re
    # Redactar API keys
    text = re.sub(r'sk-ant-[a-zA-Z0-9-]+', 'sk-ant-[REDACTED]', text)
    # Redactar passwords
    text = re.sub(r'password["\s:=]+[^\s"]+', 'password=[REDACTED]', text, flags=re.IGNORECASE)
    # Redactar tokens JWT
    text = re.sub(r'eyJ[a-zA-Z0-9._-]+', '[JWT_REDACTED]', text)
    return text


async def run_agent_with_tool_logging(
    prompt: str,
    agent_name: str = "default"
) -> str:
    """Ejecuta el agente loggeando cada herramienta con su input/output."""
    results = []
    tool_calls = []

    async with agent_logging_context(agent_name=agent_name) as log:
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(
                allowed_tools=["Read", "Edit", "Bash"]
            )
        ):
            message_type = type(message).__name__

            if message_type == "AssistantMessage":
                # Log del razonamiento del LLM (truncado)
                content = str(getattr(message, 'content', ''))
                log.debug(
                    "llm_response",
                    preview=content[:200] + "..." if len(content) > 200 else content
                )

            elif message_type == "ToolUseBlock":
                tool_name = getattr(message, 'name', 'unknown')
                tool_input = getattr(message, 'input', {})

                # Redactar datos sensibles del input
                safe_input = _redact_sensitive(str(tool_input))

                tool_start = time.monotonic()
                log.info(
                    "tool_called",
                    tool=tool_name,
                    input_preview=safe_input[:300]
                )
                tool_calls.append({
                    "tool": tool_name,
                    "start": tool_start
                })

            elif message_type == "ToolResultBlock":
                if tool_calls:
                    last_call = tool_calls[-1]
                    duration = time.monotonic() - last_call["start"]
                    tool_output = str(getattr(message, 'content', ''))
                    safe_output = _redact_sensitive(tool_output)

                    log.info(
                        "tool_completed",
                        tool=last_call["tool"],
                        duration_seconds=round(duration, 3),
                        output_bytes=len(tool_output),
                        output_preview=safe_output[:200]
                    )

            elif hasattr(message, 'content'):
                content = str(message.content)
                if content:
                    results.append(content)

        log.info("tools_summary", total_tool_calls=len(tool_calls))

    return "\n".join(results)

TypeScript con Pino

// logger.ts
import pino, { Logger } from "pino";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const logger = pino({
  level: process.env.LOG_LEVEL ?? "info",
  // En producción: JSON para log aggregators
  // En desarrollo: pretty print
  transport:
    process.env.NODE_ENV === "development"
      ? { target: "pino-pretty", options: { colorize: true } }
      : undefined,
  base: {
    service: "claude-agent",
    version: process.env.npm_package_version,
  },
  // Redactar campos sensibles automáticamente
  redact: {
    paths: ["*.api_key", "*.password", "*.token", "*.secret"],
    censor: "[REDACTED]",
  },
});

interface AgentContext {
  queryId: string;
  sessionId?: string;
  userId?: string;
  agentName: string;
}

function createChildLogger(ctx: AgentContext): Logger {
  return logger.child(ctx);
}

async function runAgentWithLogging(
  prompt: string,
  agentName: string = "default"
): Promise<string> {
  const queryId = crypto.randomUUID().slice(0, 8);
  const log = createChildLogger({ queryId, agentName });
  const startTime = Date.now();

  log.info({ promptPreview: prompt.slice(0, 100) }, "query_started");

  const results: string[] = [];
  const toolCalls: Array<{ tool: string; start: number }> = [];

  try {
    for await (const message of query(prompt, {
      allowedTools: ["Read", "Edit", "Bash"],
      maxTurns: 50,
    } as ClaudeCodeOptions)) {
      const messageType = (message as { type?: string }).type;

      if (messageType === "tool_use") {
        const toolName = (message as { name?: string }).name ?? "unknown";
        log.info({ tool: toolName }, "tool_called");
        toolCalls.push({ tool: toolName, start: Date.now() });
      }

      if (messageType === "tool_result" && toolCalls.length > 0) {
        const last = toolCalls[toolCalls.length - 1];
        const durationMs = Date.now() - last.start;
        log.info({ tool: last.tool, durationMs }, "tool_completed");
      }

      if ("content" in message) {
        const content = String(message.content);
        if (content) results.push(content);
      }
    }

    const durationMs = Date.now() - startTime;
    log.info(
      { durationMs, totalToolCalls: toolCalls.length },
      "query_completed"
    );
    return results.join("\n");
  } catch (error) {
    const err = error as Error;
    const durationMs = Date.now() - startTime;
    log.error({ error: err.message, durationMs }, "query_failed");
    throw error;
  }
}

export { logger, createChildLogger, runAgentWithLogging };

Niveles de Log por Tipo de Evento

NivelCuándo usarEjemplos
DEBUGDesarrollo, diagnosisTool input/output completo, LLM reasoning
INFOEventos normales de negocioQuery iniciado/completado, herramienta llamada
WARNSituaciones inusuales no críticasRetry realizado, costo elevado, timeout cercano
ERRORFallos que requieren atenciónQuery fallido, herramienta falló, API error
FATALEl proceso no puede continuarConfig inválida, port en uso

Rotación y Retención de Logs

# log_rotation.py
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler


def configure_file_logging(log_dir: str = "/app/logs") -> None:
    """Configura rotación de logs en archivo."""

    # Rotación por tamaño: máximo 100MB por archivo, 10 archivos
    size_handler = RotatingFileHandler(
        f"{log_dir}/agent.log",
        maxBytes=100 * 1024 * 1024,  # 100MB
        backupCount=10,
        encoding="utf-8"
    )

    # Rotación diaria: mantener 30 días
    daily_handler = TimedRotatingFileHandler(
        f"{log_dir}/agent-daily.log",
        when="midnight",
        interval=1,
        backupCount=30,
        encoding="utf-8"
    )

    logging.getLogger().addHandler(size_handler)
    logging.getLogger().addHandler(daily_handler)

3. Métricas con Prometheus

Métricas Clave para Agentes

flowchart LR
    subgraph Counters["Counters (siempre crecen)"]
        C1["queries_total{status, agent}"]
        C2["tool_calls_total{tool, status}"]
        C3["cost_usd_total{agent}"]
        C4["errors_total{type, agent}"]
    end

    subgraph Histograms["Histograms (distribución)"]
        H1["query_duration_seconds{agent}"]
        H2["tool_duration_seconds{tool}"]
        H3["tokens_used{direction, agent}"]
    end

    subgraph Gauges["Gauges (valor actual)"]
        G1["active_queries{agent}"]
        G2["queue_depth"]
        G3["session_pool_size"]
    end

Implementación Python con prometheus-client

# metrics.py
from prometheus_client import (
    Counter, Histogram, Gauge, Summary,
    generate_latest, CONTENT_TYPE_LATEST,
    CollectorRegistry, push_to_gateway
)
import time
from functools import wraps
from typing import Callable, Any
from claude_code_sdk import query, ClaudeCodeOptions

# Registry personalizado (evita conflictos con métricas del proceso)
REGISTRY = CollectorRegistry()

# ============================================================
# Definición de métricas
# ============================================================

QUERIES_TOTAL = Counter(
    "agent_queries_total",
    "Total de queries ejecutados por el agente",
    ["status", "agent_name", "model"],
    registry=REGISTRY
)

TOOL_CALLS_TOTAL = Counter(
    "agent_tool_calls_total",
    "Total de llamadas a herramientas",
    ["tool_name", "status", "agent_name"],
    registry=REGISTRY
)

COST_USD_TOTAL = Counter(
    "agent_cost_usd_total",
    "Costo total en USD incurrido por el agente",
    ["agent_name", "model"],
    registry=REGISTRY
)

ERRORS_TOTAL = Counter(
    "agent_errors_total",
    "Total de errores por tipo",
    ["error_type", "agent_name"],
    registry=REGISTRY
)

QUERY_DURATION = Histogram(
    "agent_query_duration_seconds",
    "Duración de queries en segundos",
    ["agent_name"],
    buckets=[5, 10, 30, 60, 120, 300, 600, 1200, 1800],
    registry=REGISTRY
)

TOOL_DURATION = Histogram(
    "agent_tool_duration_seconds",
    "Duración de cada herramienta en segundos",
    ["tool_name"],
    buckets=[0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60],
    registry=REGISTRY
)

ACTIVE_QUERIES = Gauge(
    "agent_active_queries",
    "Número de queries ejecutándose actualmente",
    ["agent_name"],
    registry=REGISTRY
)

TOKENS_USED = Histogram(
    "agent_tokens_used_total",
    "Tokens usados por query",
    ["direction", "agent_name"],  # direction: input/output
    buckets=[100, 500, 1000, 5000, 10000, 50000, 100000],
    registry=REGISTRY
)


# ============================================================
# Decorator para medir métricas automáticamente
# ============================================================

def track_agent_metrics(agent_name: str = "default"):
    """Decorator que trackea métricas automáticamente para una función de agente."""

    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> Any:
            ACTIVE_QUERIES.labels(agent_name=agent_name).inc()
            start_time = time.monotonic()
            status = "success"

            try:
                result = await func(*args, **kwargs)
                return result
            except Exception as e:
                status = "error"
                ERRORS_TOTAL.labels(
                    error_type=type(e).__name__,
                    agent_name=agent_name
                ).inc()
                raise
            finally:
                duration = time.monotonic() - start_time
                ACTIVE_QUERIES.labels(agent_name=agent_name).dec()
                QUERIES_TOTAL.labels(
                    status=status,
                    agent_name=agent_name,
                    model="claude-3-5-sonnet"
                ).inc()
                QUERY_DURATION.labels(agent_name=agent_name).observe(duration)

        return wrapper
    return decorator


# ============================================================
# Uso completo con el SDK
# ============================================================

@track_agent_metrics(agent_name="code_reviewer")
async def run_code_review_agent(file_path: str) -> str:
    results = []
    total_cost = 0.0

    async for message in query(
        prompt=f"Revisa el código en {file_path} y encuentra bugs",
        options=ClaudeCodeOptions(
            allowed_tools=["Read", "Bash"],
            max_turns=20
        )
    ):
        # Trackear costo
        if hasattr(message, 'cost_usd') and message.cost_usd:
            cost = float(message.cost_usd)
            total_cost += cost
            COST_USD_TOTAL.labels(
                agent_name="code_reviewer",
                model="claude-3-5-sonnet"
            ).inc(cost)

        # Trackear herramientas
        if hasattr(message, 'type') and message.type == "tool_use":
            tool_name = getattr(message, 'name', 'unknown')
            tool_start = time.monotonic()

            # Nota: el resultado viene en el siguiente mensaje
            # En una implementación real, correlacionarías input y output
            TOOL_CALLS_TOTAL.labels(
                tool_name=tool_name,
                status="called",
                agent_name="code_reviewer"
            ).inc()

        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))

    return "\n".join(results)


# ============================================================
# Endpoint de métricas para Prometheus
# ============================================================

def get_metrics() -> tuple[bytes, str]:
    """Retorna métricas en formato Prometheus."""
    return generate_latest(REGISTRY), CONTENT_TYPE_LATEST

Implementación TypeScript con prom-client

// metrics.ts
import {
  Registry,
  Counter,
  Histogram,
  Gauge,
  collectDefaultMetrics,
} from "prom-client";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const register = new Registry();
collectDefaultMetrics({ register });

// ============================================================
// Definición de métricas
// ============================================================

const queriesTotal = new Counter({
  name: "agent_queries_total",
  help: "Total de queries ejecutados",
  labelNames: ["status", "agent_name", "model"] as const,
  registers: [register],
});

const toolCallsTotal = new Counter({
  name: "agent_tool_calls_total",
  help: "Total de llamadas a herramientas",
  labelNames: ["tool_name", "status", "agent_name"] as const,
  registers: [register],
});

const costUsdTotal = new Counter({
  name: "agent_cost_usd_total",
  help: "Costo total en USD",
  labelNames: ["agent_name", "model"] as const,
  registers: [register],
});

const queryDuration = new Histogram({
  name: "agent_query_duration_seconds",
  help: "Duración de queries en segundos",
  labelNames: ["agent_name"] as const,
  buckets: [5, 10, 30, 60, 120, 300, 600, 1800],
  registers: [register],
});

const toolDuration = new Histogram({
  name: "agent_tool_duration_seconds",
  help: "Duración de herramientas en segundos",
  labelNames: ["tool_name"] as const,
  buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30],
  registers: [register],
});

const activeQueries = new Gauge({
  name: "agent_active_queries",
  help: "Queries ejecutándose actualmente",
  labelNames: ["agent_name"] as const,
  registers: [register],
});

// ============================================================
// Agente con métricas
// ============================================================

async function runAgentWithMetrics(
  prompt: string,
  agentName: string = "default"
): Promise<string> {
  const startTime = Date.now();
  activeQueries.labels({ agent_name: agentName }).inc();

  const results: string[] = [];
  let status = "success";

  try {
    for await (const message of query(prompt, {
      allowedTools: ["Read", "Bash"],
      maxTurns: 30,
    } as ClaudeCodeOptions)) {
      if ("cost_usd" in message && typeof message.cost_usd === "number") {
        costUsdTotal
          .labels({ agent_name: agentName, model: "claude-3-5-sonnet" })
          .inc(message.cost_usd);
      }

      if ("type" in message && (message as { type: string }).type === "tool_use") {
        const toolName =
          (message as { name?: string }).name ?? "unknown";
        toolCallsTotal
          .labels({ tool_name: toolName, status: "called", agent_name: agentName })
          .inc();
      }

      if ("content" in message) {
        const content = String(message.content);
        if (content) results.push(content);
      }
    }

    return results.join("\n");
  } catch (error) {
    status = "error";
    throw error;
  } finally {
    const durationSeconds = (Date.now() - startTime) / 1000;
    activeQueries.labels({ agent_name: agentName }).dec();
    queriesTotal
      .labels({ status, agent_name: agentName, model: "claude-3-5-sonnet" })
      .inc();
    queryDuration.labels({ agent_name: agentName }).observe(durationSeconds);
  }
}

async function getMetrics(): Promise<{ body: string; contentType: string }> {
  return {
    body: await register.metrics(),
    contentType: register.contentType,
  };
}

export {
  register,
  queriesTotal,
  toolCallsTotal,
  costUsdTotal,
  queryDuration,
  activeQueries,
  runAgentWithMetrics,
  getMetrics,
};

4. OpenTelemetry para Trazas

Conceptos de Tracing para Agentes

gantt
    title Traza distribuida de un query del agente
    dateFormat  X
    axisFormat %Ss

    section Query principal
    Span: query_total          :0, 45

    section LLM Calls
    Span: llm_call_1           :0, 3
    Span: llm_call_2           :8, 5
    Span: llm_call_3           :20, 4
    Span: llm_call_4           :35, 6

    section Tool Calls
    Span: tool_read_file       :3, 1
    Span: tool_bash_ls         :13, 2
    Span: tool_edit_file       :24, 6
    Span: tool_bash_test       :31, 3

Implementación Python con OpenTelemetry

# tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import os

def configure_tracing(service_name: str = "claude-agent") -> trace.Tracer:
    """Configura OpenTelemetry con exportador a Jaeger."""

    resource = Resource.create({
        "service.name": service_name,
        "service.version": os.getenv("APP_VERSION", "unknown"),
        "deployment.environment": os.getenv("ENVIRONMENT", "production"),
    })

    # Exportador a Jaeger
    jaeger_exporter = JaegerExporter(
        agent_host_name=os.getenv("JAEGER_HOST", "localhost"),
        agent_port=int(os.getenv("JAEGER_PORT", "6831")),
    )

    provider = TracerProvider(resource=resource)
    provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
    trace.set_tracer_provider(provider)

    return trace.get_tracer(service_name)

tracer = configure_tracing()


async def run_agent_with_tracing(
    prompt: str,
    agent_name: str = "default",
    parent_context: dict | None = None
) -> str:
    """Ejecuta el agente con trazas completas de cada paso."""

    # Propagar contexto de trace desde el caller (para sistemas distribuidos)
    propagator = TraceContextTextMapPropagator()
    context = propagator.extract(parent_context or {})

    with tracer.start_as_current_span(
        "agent.query",
        context=context,
        attributes={
            "agent.name": agent_name,
            "prompt.length": len(prompt),
            "prompt.preview": prompt[:100],
        }
    ) as query_span:

        results = []
        llm_call_count = 0
        tool_call_count = 0
        current_tool_span = None

        try:
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(
                    allowed_tools=["Read", "Edit", "Bash"],
                    max_turns=50
                )
            ):
                message_type = type(message).__name__

                if message_type == "AssistantMessage":
                    llm_call_count += 1
                    # Span para cada llamada al LLM
                    with tracer.start_as_current_span(
                        f"agent.llm_call",
                        attributes={
                            "llm.call_number": llm_call_count,
                            "model": "claude-3-5-sonnet",
                        }
                    ) as llm_span:
                        content = str(getattr(message, 'content', ''))
                        llm_span.set_attribute("response.length", len(content))

                elif message_type == "ToolUseBlock":
                    tool_name = getattr(message, 'name', 'unknown')
                    tool_call_count += 1

                    # Abrir span para la herramienta (se cierra en ToolResultBlock)
                    current_tool_span = tracer.start_span(
                        f"agent.tool.{tool_name}",
                        attributes={
                            "tool.name": tool_name,
                            "tool.call_number": tool_call_count,
                        }
                    )

                elif message_type == "ToolResultBlock" and current_tool_span:
                    output = str(getattr(message, 'content', ''))
                    current_tool_span.set_attribute("tool.output_bytes", len(output))
                    current_tool_span.set_attribute(
                        "tool.status",
                        "error" if getattr(message, 'is_error', False) else "success"
                    )
                    current_tool_span.end()
                    current_tool_span = None

                if hasattr(message, 'cost_usd') and message.cost_usd:
                    cost = float(message.cost_usd)
                    query_span.set_attribute(
                        "agent.cost_usd",
                        round(cost + float(query_span.attributes.get("agent.cost_usd", 0)), 6)
                    )

                if hasattr(message, 'content') and message.content:
                    results.append(str(message.content))

            query_span.set_attribute("agent.llm_calls", llm_call_count)
            query_span.set_attribute("agent.tool_calls", tool_call_count)
            query_span.set_attribute("agent.status", "success")

        except Exception as e:
            query_span.set_attribute("agent.status", "error")
            query_span.record_exception(e)
            if current_tool_span:
                current_tool_span.set_attribute("tool.status", "error")
                current_tool_span.end()
            raise

        return "\n".join(results)

Context Propagation entre Agentes

# multi_agent_tracing.py
from opentelemetry import trace, context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

propagator = TraceContextTextMapPropagator()

async def orchestrator_with_tracing(task: str) -> str:
    """Orquestador que propaga contexto de tracing a subagentes."""

    with tracer.start_as_current_span("orchestrator.execute") as span:
        span.set_attribute("task.description", task[:100])

        # Extraer contexto actual para propagar a subagentes
        carrier: dict = {}
        propagator.inject(carrier)

        # Subagente 1: análisis
        with tracer.start_as_current_span("orchestrator.call_analyzer"):
            analysis = await call_subagent(
                prompt=f"Analiza: {task}",
                trace_context=carrier  # Propagar contexto
            )

        # Subagente 2: implementación
        with tracer.start_as_current_span("orchestrator.call_implementer"):
            implementation = await call_subagent(
                prompt=f"Implementa basado en: {analysis}",
                trace_context=carrier
            )

        return implementation


async def call_subagent(prompt: str, trace_context: dict) -> str:
    """Subagente que continúa la traza del orquestador."""
    # Restaurar contexto del padre
    ctx = propagator.extract(trace_context)

    with tracer.start_as_current_span(
        "subagent.execute",
        context=ctx  # Continuar traza del padre
    ) as span:
        results = []
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(allowed_tools=["Read", "Bash"])
        ):
            if hasattr(message, 'content') and message.content:
                results.append(str(message.content))
        return "\n".join(results)

Exportar a Jaeger con Docker

# docker-compose con Jaeger
services:
  jaeger:
    image: jaegertracing/all-in-one:1.58
    ports:
      - "16686:16686"  # UI
      - "6831:6831/udp"  # Thrift compact
      - "4317:4317"  # OTLP gRPC
    environment:
      COLLECTOR_OTLP_ENABLED: "true"
      SPAN_STORAGE_TYPE: badger
    volumes:
      - jaeger_data:/badger

5. Grafana Dashboards

Overview del Agente

flowchart LR
    subgraph GrafanaDashboard["Dashboard: Claude Agent Overview"]
        direction TB
        subgraph Row1["Fila 1: Actividad"]
            P1["Queries/min\nStat Panel"]
            P2["Active Queries\nGauge Panel"]
            P3["Success Rate\nStat Panel"]
        end
        subgraph Row2["Latencia"]
            P4["Latencia p50/p95/p99\nTime Series"]
            P5["Distribución duración\nHeatmap"]
        end
        subgraph Row3["Costos"]
            P6["Costo USD/hora\nTime Series"]
            P7["Costo acumulado hoy\nStat Panel"]
            P8["Top herramientas por costo\nBar Chart"]
        end
        subgraph Row4["Errores"]
            P9["Error rate\nTime Series"]
            P10["Errores por tipo\nPie Chart"]
        end
    end

JSON del Dashboard para Importar

{
  "title": "Claude Agent Overview",
  "uid": "claude-agent-overview",
  "tags": ["claude", "agent", "ai"],
  "time": { "from": "now-6h", "to": "now" },
  "refresh": "30s",
  "panels": [
    {
      "id": 1,
      "title": "Queries por minuto",
      "type": "stat",
      "gridPos": { "h": 4, "w": 4, "x": 0, "y": 0 },
      "targets": [
        {
          "expr": "rate(agent_queries_total[1m]) * 60",
          "legendFormat": "queries/min"
        }
      ],
      "options": {
        "reduceOptions": { "calcs": ["lastNotNull"] },
        "colorMode": "background",
        "thresholds": {
          "steps": [
            { "value": 0, "color": "green" },
            { "value": 10, "color": "yellow" },
            { "value": 50, "color": "red" }
          ]
        }
      }
    },
    {
      "id": 2,
      "title": "Latencia p50/p95/p99",
      "type": "timeseries",
      "gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 },
      "targets": [
        {
          "expr": "histogram_quantile(0.50, rate(agent_query_duration_seconds_bucket[5m]))",
          "legendFormat": "p50"
        },
        {
          "expr": "histogram_quantile(0.95, rate(agent_query_duration_seconds_bucket[5m]))",
          "legendFormat": "p95"
        },
        {
          "expr": "histogram_quantile(0.99, rate(agent_query_duration_seconds_bucket[5m]))",
          "legendFormat": "p99"
        }
      ],
      "fieldConfig": {
        "defaults": {
          "unit": "s",
          "custom": { "lineWidth": 2 }
        }
      }
    },
    {
      "id": 3,
      "title": "Costo USD acumulado hoy",
      "type": "stat",
      "gridPos": { "h": 4, "w": 4, "x": 8, "y": 0 },
      "targets": [
        {
          "expr": "increase(agent_cost_usd_total[24h])",
          "legendFormat": "USD hoy"
        }
      ],
      "options": {
        "reduceOptions": { "calcs": ["sum"] },
        "colorMode": "background",
        "thresholds": {
          "steps": [
            { "value": 0, "color": "green" },
            { "value": 20, "color": "yellow" },
            { "value": 50, "color": "red" }
          ]
        }
      },
      "fieldConfig": {
        "defaults": { "unit": "currencyUSD" }
      }
    },
    {
      "id": 4,
      "title": "Error Rate",
      "type": "timeseries",
      "gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 },
      "targets": [
        {
          "expr": "rate(agent_queries_total{status='error'}[5m]) / rate(agent_queries_total[5m])",
          "legendFormat": "error rate"
        }
      ],
      "fieldConfig": {
        "defaults": {
          "unit": "percentunit",
          "thresholds": {
            "steps": [
              { "value": 0, "color": "green" },
              { "value": 0.05, "color": "red" }
            ]
          }
        }
      }
    },
    {
      "id": 5,
      "title": "Herramientas más usadas",
      "type": "barchart",
      "gridPos": { "h": 8, "w": 12, "x": 0, "y": 12 },
      "targets": [
        {
          "expr": "sum by (tool_name) (increase(agent_tool_calls_total[24h]))",
          "legendFormat": "{{ tool_name }}"
        }
      ]
    }
  ]
}

6. Alertas

AlertManager Rules

# alerting-rules.yaml
groups:
  - name: claude-agent-critical
    interval: 30s
    rules:
      # Error rate alto
      - alert: AgentHighErrorRate
        expr: |
          (
            rate(agent_queries_total{status="error"}[5m])
            /
            rate(agent_queries_total[5m])
          ) > 0.05
        for: 5m
        labels:
          severity: critical
          team: platform
        annotations:
          summary: "Error rate del agente {{ $labels.agent_name }} > 5%"
          description: |
            El agente {{ $labels.agent_name }} tiene un error rate de
            {{ $value | humanizePercentage }} en los últimos 5 minutos.
            Revisar logs: kubectl logs -n claude-agents -l app=claude-agent --tail=100
          runbook_url: "https://wiki.example.com/runbooks/agent-high-error-rate"

      # Costo diario excedido
      - alert: AgentDailyCostExceeded
        expr: |
          increase(agent_cost_usd_total[24h]) > 50
        labels:
          severity: warning
          team: platform
        annotations:
          summary: "Costo diario del agente excedió $50"
          description: |
            El agente {{ $labels.agent_name }} ha gastado
            ${{ $value | humanize }} USD en las últimas 24 horas.
          runbook_url: "https://wiki.example.com/runbooks/agent-high-cost"

      # Latencia alta
      - alert: AgentHighLatency
        expr: |
          histogram_quantile(
            0.99,
            rate(agent_query_duration_seconds_bucket[10m])
          ) > 1800
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Latencia p99 del agente > 30 minutos"
          description: |
            El p99 de latencia del agente {{ $labels.agent_name }} es
            {{ $value | humanizeDuration }}. Podría indicar queries atascados.

      # Agente sin actividad (posible fallo silencioso)
      - alert: AgentNoActivity
        expr: |
          absent(rate(agent_queries_total[15m])) == 1
        for: 15m
        labels:
          severity: warning
        annotations:
          summary: "No se han procesado queries en 15 minutos"
          description: "El agente puede estar caído o en estado de error silencioso."

      # Queue de trabajos creciendo
      - alert: AgentQueueDepthHigh
        expr: agent_queue_depth > 100
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Queue del agente tiene más de 100 trabajos pendientes"

Configuración de Notificaciones

# alertmanager.yaml
global:
  resolve_timeout: 5m
  slack_api_url: "${SLACK_WEBHOOK_URL}"

route:
  group_by: ["alertname", "agent_name"]
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h
  receiver: default

  routes:
    # Críticos: PagerDuty inmediato
    - match:
        severity: critical
      receiver: pagerduty
      continue: true

    # Todos los alertas: Slack
    - match_re:
        severity: warning|critical
      receiver: slack

receivers:
  - name: default
    slack_configs:
      - channel: "#alerts-claude-agent"
        text: "{{ range .Alerts }}{{ .Annotations.description }}{{ end }}"

  - name: slack
    slack_configs:
      - channel: "#alerts-claude-agent"
        title: "{{ .GroupLabels.alertname }}"
        text: |
          *Severidad:* {{ .GroupLabels.severity }}
          *Agente:* {{ .GroupLabels.agent_name }}
          {{ range .Alerts }}
          *Descripción:* {{ .Annotations.description }}
          *Runbook:* {{ .Annotations.runbook_url }}
          {{ end }}
        color: '{{ if eq .Status "firing" }}danger{{ else }}good{{ end }}'

  - name: pagerduty
    pagerduty_configs:
      - service_key: "${PAGERDUTY_SERVICE_KEY}"
        description: "{{ .GroupLabels.alertname }}: {{ .CommonAnnotations.summary }}"

7. Sentry para Error Tracking

Configuración de Sentry

# sentry_setup.py
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.asyncio import AsyncioIntegration
import os


def configure_sentry() -> None:
    """Configura Sentry para captura automática de errores."""

    sentry_sdk.init(
        dsn=os.getenv("SENTRY_DSN"),
        environment=os.getenv("ENVIRONMENT", "production"),
        release=os.getenv("APP_VERSION", "unknown"),
        traces_sample_rate=0.1,  # 10% de transacciones para performance

        integrations=[
            FastApiIntegration(),
            AsyncioIntegration(),
        ],

        # No capturar datos sensibles
        before_send=filter_sensitive_data,
    )


def filter_sensitive_data(event: dict, hint: dict) -> dict | None:
    """Filtra datos sensibles antes de enviar a Sentry."""
    # Remover API keys de los extras
    if "extra" in event:
        for key in list(event["extra"].keys()):
            if any(word in key.lower() for word in ["key", "token", "password", "secret"]):
                event["extra"][key] = "[FILTERED]"

    return event


async def run_agent_with_sentry(prompt: str) -> str:
    """Agente con contexto de Sentry para mejor error tracking."""

    with sentry_sdk.push_scope() as scope:
        # Añadir contexto del agente al scope de Sentry
        scope.set_tag("agent.name", "production_agent")
        scope.set_context("agent_request", {
            "prompt_preview": prompt[:200],
            "prompt_length": len(prompt),
        })

        try:
            results = []
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(
                    allowed_tools=["Read", "Bash"],
                    max_turns=30
                )
            ):
                if hasattr(message, 'session_id') and message.session_id:
                    # Añadir session_id al scope para correlación
                    scope.set_tag("session.id", message.session_id)

                if hasattr(message, 'content') and message.content:
                    results.append(str(message.content))

            return "\n".join(results)

        except Exception as e:
            sentry_sdk.capture_exception(e)
            raise

8. Audit Trail

Registro Inmutable de Acciones

# audit.py
import json
import time
import hashlib
from dataclasses import dataclass, asdict
from typing import Any
import asyncpg  # PostgreSQL async


@dataclass
class AuditEntry:
    """Entrada del audit trail."""
    timestamp: float
    entry_id: str
    user_id: str | None
    session_id: str | None
    agent_name: str
    action_type: str  # "query_started", "tool_called", "query_completed", "query_failed"
    action_details: dict
    cost_usd: float | None
    previous_hash: str  # Hash del entry anterior para cadena inmutable

    def to_dict(self) -> dict:
        return asdict(self)


class AuditTrail:
    """Audit trail append-only con cadena de hashes para integridad."""

    def __init__(self, db_pool: asyncpg.Pool):
        self.db = db_pool

    async def append(
        self,
        user_id: str | None,
        session_id: str | None,
        agent_name: str,
        action_type: str,
        action_details: dict,
        cost_usd: float | None = None
    ) -> AuditEntry:
        # Obtener hash del último entry para encadenar
        last_hash = await self._get_last_hash()

        entry_id = hashlib.sha256(
            f"{time.time()}{session_id}{action_type}".encode()
        ).hexdigest()[:16]

        entry = AuditEntry(
            timestamp=time.time(),
            entry_id=entry_id,
            user_id=user_id,
            session_id=session_id,
            agent_name=agent_name,
            action_type=action_type,
            action_details=action_details,
            cost_usd=cost_usd,
            previous_hash=last_hash
        )

        # Calcular hash de este entry
        entry_hash = hashlib.sha256(
            json.dumps(entry.to_dict(), sort_keys=True).encode()
        ).hexdigest()

        # Insertar en DB (append-only, no hay UPDATE ni DELETE)
        await self.db.execute(
            """
            INSERT INTO audit_trail
            (timestamp, entry_id, user_id, session_id, agent_name,
             action_type, action_details, cost_usd, previous_hash, entry_hash)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
            """,
            entry.timestamp, entry.entry_id, entry.user_id,
            entry.session_id, entry.agent_name, entry.action_type,
            json.dumps(entry.action_details), entry.cost_usd,
            entry.previous_hash, entry_hash
        )

        return entry

    async def _get_last_hash(self) -> str:
        row = await self.db.fetchrow(
            "SELECT entry_hash FROM audit_trail ORDER BY timestamp DESC LIMIT 1"
        )
        return row["entry_hash"] if row else "genesis"

    async def query_by_session(self, session_id: str) -> list[dict]:
        rows = await self.db.fetch(
            """
            SELECT * FROM audit_trail
            WHERE session_id = $1
            ORDER BY timestamp ASC
            """,
            session_id
        )
        return [dict(row) for row in rows]

Schema SQL:

-- schema.sql
CREATE TABLE audit_trail (
    id BIGSERIAL,
    timestamp DOUBLE PRECISION NOT NULL,
    entry_id VARCHAR(16) NOT NULL UNIQUE,
    user_id VARCHAR(255),
    session_id VARCHAR(255),
    agent_name VARCHAR(100) NOT NULL,
    action_type VARCHAR(50) NOT NULL,
    action_details JSONB NOT NULL DEFAULT '{}',
    cost_usd NUMERIC(10, 6),
    previous_hash VARCHAR(64) NOT NULL,
    entry_hash VARCHAR(64) NOT NULL UNIQUE,
    -- No permitir UPDATE ni DELETE via policy
    inserted_at TIMESTAMPTZ DEFAULT NOW()
);

-- Índices para consultas comunes
CREATE INDEX idx_audit_session ON audit_trail(session_id);
CREATE INDEX idx_audit_user ON audit_trail(user_id);
CREATE INDEX idx_audit_timestamp ON audit_trail(timestamp);
CREATE INDEX idx_audit_action_type ON audit_trail(action_type);

-- Row Level Security: solo INSERT, no UPDATE ni DELETE
ALTER TABLE audit_trail ENABLE ROW LEVEL SECURITY;
CREATE POLICY audit_insert_only ON audit_trail FOR INSERT TO app_user WITH CHECK (true);
-- Sin políticas para UPDATE/DELETE = denegado por defecto

9. Dashboard de Costos

Tracking de Costos con Python y PostgreSQL

# cost_tracker.py
import asyncio
from datetime import datetime, date
from typing import AsyncGenerator
import asyncpg
from claude_code_sdk import query, ClaudeCodeOptions


async def track_agent_with_costs(
    prompt: str,
    user_id: str,
    agent_name: str,
    db_pool: asyncpg.Pool
) -> str:
    """Ejecuta el agente y guarda métricas de costo en PostgreSQL."""

    session_id = None
    total_cost = 0.0
    total_input_tokens = 0
    total_output_tokens = 0
    results = []
    start_time = datetime.utcnow()

    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(
            allowed_tools=["Read", "Edit", "Bash"],
            max_turns=50
        )
    ):
        if hasattr(message, 'session_id') and message.session_id:
            session_id = message.session_id

        if hasattr(message, 'cost_usd') and message.cost_usd:
            total_cost += float(message.cost_usd)

        if hasattr(message, 'usage'):
            usage = message.usage
            if hasattr(usage, 'input_tokens'):
                total_input_tokens += usage.input_tokens
            if hasattr(usage, 'output_tokens'):
                total_output_tokens += usage.output_tokens

        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))

    # Guardar registro de costo
    await db_pool.execute(
        """
        INSERT INTO agent_cost_log
        (session_id, user_id, agent_name, prompt_preview, started_at,
         duration_seconds, cost_usd, input_tokens, output_tokens)
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
        """,
        session_id,
        user_id,
        agent_name,
        prompt[:200],
        start_time,
        (datetime.utcnow() - start_time).total_seconds(),
        total_cost,
        total_input_tokens,
        total_output_tokens
    )

    return "\n".join(results)


async def get_cost_summary(
    db_pool: asyncpg.Pool,
    period_days: int = 30
) -> dict:
    """Obtiene resumen de costos por agente y usuario."""

    rows = await db_pool.fetch(
        """
        SELECT
            agent_name,
            user_id,
            COUNT(*) as query_count,
            SUM(cost_usd) as total_cost_usd,
            AVG(cost_usd) as avg_cost_usd,
            AVG(duration_seconds) as avg_duration_seconds,
            SUM(input_tokens) as total_input_tokens,
            SUM(output_tokens) as total_output_tokens
        FROM agent_cost_log
        WHERE started_at > NOW() - INTERVAL '1 day' * $1
        GROUP BY agent_name, user_id
        ORDER BY total_cost_usd DESC
        """,
        period_days
    )

    return {
        "period_days": period_days,
        "agents": [dict(row) for row in rows],
        "total_usd": sum(float(row["total_cost_usd"] or 0) for row in rows)
    }


async def project_monthly_cost(
    db_pool: asyncpg.Pool
) -> dict:
    """Proyecta el costo mensual basado en los últimos 7 días."""

    row = await db_pool.fetchrow(
        """
        SELECT
            SUM(cost_usd) as last_7_days_cost,
            COUNT(*) as last_7_days_queries
        FROM agent_cost_log
        WHERE started_at > NOW() - INTERVAL '7 days'
        """
    )

    if not row or not row["last_7_days_cost"]:
        return {"projected_monthly_usd": 0, "daily_average_usd": 0}

    daily_avg = float(row["last_7_days_cost"]) / 7
    projected_monthly = daily_avg * 30

    return {
        "last_7_days_usd": float(row["last_7_days_cost"]),
        "last_7_days_queries": row["last_7_days_queries"],
        "daily_average_usd": round(daily_avg, 4),
        "projected_monthly_usd": round(projected_monthly, 2),
    }

Schema SQL para costos:

-- cost_schema.sql
CREATE TABLE agent_cost_log (
    id BIGSERIAL PRIMARY KEY,
    session_id VARCHAR(255),
    user_id VARCHAR(255) NOT NULL,
    agent_name VARCHAR(100) NOT NULL,
    prompt_preview TEXT,
    started_at TIMESTAMPTZ NOT NULL,
    duration_seconds NUMERIC(10, 2),
    cost_usd NUMERIC(10, 6) DEFAULT 0,
    input_tokens INTEGER DEFAULT 0,
    output_tokens INTEGER DEFAULT 0
);

-- Índices
CREATE INDEX idx_cost_user ON agent_cost_log(user_id);
CREATE INDEX idx_cost_agent ON agent_cost_log(agent_name);
CREATE INDEX idx_cost_started_at ON agent_cost_log(started_at);

-- Vista para proyecciones
CREATE VIEW daily_cost_summary AS
SELECT
    DATE_TRUNC('day', started_at) as day,
    agent_name,
    COUNT(*) as queries,
    SUM(cost_usd) as total_cost_usd,
    AVG(cost_usd) as avg_cost_per_query
FROM agent_cost_log
GROUP BY 1, 2
ORDER BY 1 DESC, 4 DESC;

Panel de Grafana para costos con PostgreSQL como datasource:

{
  "title": "Proyección de costos mensual",
  "type": "stat",
  "targets": [
    {
      "rawSql": "SELECT SUM(cost_usd) * (30.0 / 7.0) as projected_monthly FROM agent_cost_log WHERE started_at > NOW() - INTERVAL '7 days'",
      "format": "table"
    }
  ],
  "fieldConfig": {
    "defaults": {
      "unit": "currencyUSD",
      "thresholds": {
        "steps": [
          { "value": 0, "color": "green" },
          { "value": 50, "color": "yellow" },
          { "value": 100, "color": "red" }
        ]
      }
    }
  }
}

Resumen del Capítulo

La observabilidad para agentes autónomos requiere los tres pilares coordinados:

flowchart LR
    Agent["Agente\nen ejecución"] --> Logs["Structlog/Pino\n→ Loki"]
    Agent --> Metrics["prometheus-client\n→ Prometheus → Grafana"]
    Agent --> Traces["OpenTelemetry\n→ Jaeger"]
    Agent --> Errors["Sentry\n→ Error tracking"]
    Agent --> Costs["PostgreSQL\n→ Cost dashboard"]

    Logs --> Alerts["AlertManager\n→ Slack/PagerDuty"]
    Metrics --> Alerts

Puntos clave:

Próximo capítulo: Capítulo 19: Patrones Multi-Agente Avanzados — Reflexión, debate, ensemble y sistemas con memoria compartida.


10. Dashboard Grafana Completo

JSON Completo para Importar en Grafana

El siguiente dashboard cubre los cinco paneles críticos para producción: Query Rate, latencia por percentil, costos, heatmap de herramientas y error rate. Importar desde Dashboards → Import → Paste JSON.

{
  "title": "Claude Agent — Production Dashboard",
  "uid": "claude-agent-prod-v2",
  "schemaVersion": 38,
  "version": 1,
  "tags": ["claude", "agent", "ai", "production"],
  "time": { "from": "now-6h", "to": "now" },
  "refresh": "30s",
  "templating": {
    "list": [
      {
        "name": "agent",
        "label": "Agente",
        "type": "query",
        "datasource": { "type": "prometheus", "uid": "prometheus" },
        "query": "label_values(agent_queries_total, agent_name)",
        "multi": true,
        "includeAll": true,
        "current": { "text": "All", "value": "$__all" }
      },
      {
        "name": "model",
        "label": "Modelo",
        "type": "query",
        "datasource": { "type": "prometheus", "uid": "prometheus" },
        "query": "label_values(agent_queries_total, model)",
        "multi": false,
        "includeAll": true
      },
      {
        "name": "user_id",
        "label": "Usuario",
        "type": "query",
        "datasource": { "type": "prometheus", "uid": "prometheus" },
        "query": "label_values(agent_queries_total, user_id)",
        "multi": true,
        "includeAll": true
      }
    ]
  },
  "panels": [
    {
      "id": 10,
      "title": "Query Rate (queries/min)",
      "type": "stat",
      "gridPos": { "h": 4, "w": 6, "x": 0, "y": 0 },
      "datasource": { "type": "prometheus", "uid": "prometheus" },
      "targets": [
        {
          "expr": "sum(rate(agent_queries_total{agent_name=~\"$agent\"}[1m])) * 60",
          "legendFormat": "queries/min"
        }
      ],
      "options": {
        "reduceOptions": { "calcs": ["lastNotNull"] },
        "colorMode": "background",
        "graphMode": "area",
        "orientation": "auto",
        "thresholds": {
          "mode": "absolute",
          "steps": [
            { "value": null, "color": "green" },
            { "value": 30, "color": "yellow" },
            { "value": 100, "color": "red" }
          ]
        }
      }
    },
    {
      "id": 11,
      "title": "Latencia P50 / P95 / P99",
      "type": "timeseries",
      "gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 },
      "datasource": { "type": "prometheus", "uid": "prometheus" },
      "targets": [
        {
          "expr": "histogram_quantile(0.50, sum by (le) (rate(agent_query_duration_seconds_bucket{agent_name=~\"$agent\"}[5m])))",
          "legendFormat": "p50"
        },
        {
          "expr": "histogram_quantile(0.95, sum by (le) (rate(agent_query_duration_seconds_bucket{agent_name=~\"$agent\"}[5m])))",
          "legendFormat": "p95"
        },
        {
          "expr": "histogram_quantile(0.99, sum by (le) (rate(agent_query_duration_seconds_bucket{agent_name=~\"$agent\"}[5m])))",
          "legendFormat": "p99"
        }
      ],
      "fieldConfig": {
        "defaults": {
          "unit": "s",
          "custom": {
            "lineWidth": 2,
            "fillOpacity": 10
          },
          "thresholds": {
            "mode": "absolute",
            "steps": [
              { "value": null, "color": "green" },
              { "value": 60, "color": "yellow" },
              { "value": 300, "color": "red" }
            ]
          }
        },
        "overrides": [
          { "matcher": { "id": "byName", "options": "p99" }, "properties": [{ "id": "color", "value": { "fixedColor": "red", "mode": "fixed" } }] },
          { "matcher": { "id": "byName", "options": "p95" }, "properties": [{ "id": "color", "value": { "fixedColor": "orange", "mode": "fixed" } }] },
          { "matcher": { "id": "byName", "options": "p50" }, "properties": [{ "id": "color", "value": { "fixedColor": "green", "mode": "fixed" } }] }
        ]
      }
    },
    {
      "id": 12,
      "title": "Costo USD Over Time",
      "type": "timeseries",
      "gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 },
      "datasource": { "type": "prometheus", "uid": "prometheus" },
      "targets": [
        {
          "expr": "sum by (agent_name) (rate(agent_cost_usd_total{agent_name=~\"$agent\"}[5m])) * 3600",
          "legendFormat": "{{ agent_name }} USD/hora"
        },
        {
          "expr": "sum(increase(agent_cost_usd_total{agent_name=~\"$agent\"}[24h]))",
          "legendFormat": "Total USD hoy"
        }
      ],
      "fieldConfig": {
        "defaults": {
          "unit": "currencyUSD",
          "custom": { "lineWidth": 2, "fillOpacity": 20 }
        }
      }
    },
    {
      "id": 13,
      "title": "Tool Usage Heatmap",
      "type": "heatmap",
      "gridPos": { "h": 8, "w": 12, "x": 0, "y": 12 },
      "datasource": { "type": "prometheus", "uid": "prometheus" },
      "targets": [
        {
          "expr": "sum by (tool_name) (increase(agent_tool_calls_total{agent_name=~\"$agent\"}[1h]))",
          "legendFormat": "{{ tool_name }}"
        }
      ],
      "options": {
        "calculate": false,
        "color": {
          "scheme": "Oranges",
          "steps": 64
        },
        "yAxis": { "axisLabel": "Herramienta" },
        "tooltip": { "showHistogram": true }
      }
    },
    {
      "id": 14,
      "title": "Error Rate por Agente",
      "type": "timeseries",
      "gridPos": { "h": 8, "w": 12, "x": 12, "y": 12 },
      "datasource": { "type": "prometheus", "uid": "prometheus" },
      "targets": [
        {
          "expr": "sum by (agent_name) (rate(agent_queries_total{status=\"error\", agent_name=~\"$agent\"}[5m])) / sum by (agent_name) (rate(agent_queries_total{agent_name=~\"$agent\"}[5m]))",
          "legendFormat": "{{ agent_name }}"
        }
      ],
      "fieldConfig": {
        "defaults": {
          "unit": "percentunit",
          "min": 0,
          "max": 1,
          "thresholds": {
            "mode": "absolute",
            "steps": [
              { "value": null, "color": "green" },
              { "value": 0.01, "color": "yellow" },
              { "value": 0.05, "color": "red" }
            ]
          }
        }
      }
    },
    {
      "id": 15,
      "title": "Active Queries (Gauge)",
      "type": "gauge",
      "gridPos": { "h": 4, "w": 6, "x": 6, "y": 0 },
      "datasource": { "type": "prometheus", "uid": "prometheus" },
      "targets": [
        {
          "expr": "sum(agent_active_queries{agent_name=~\"$agent\"})",
          "legendFormat": "activos"
        }
      ],
      "options": {
        "minValue": 0,
        "maxValue": 50,
        "thresholds": {
          "mode": "absolute",
          "steps": [
            { "value": null, "color": "green" },
            { "value": 20, "color": "yellow" },
            { "value": 40, "color": "red" }
          ]
        }
      }
    }
  ],
  "annotations": {
    "list": [
      {
        "name": "Deployments",
        "datasource": { "type": "prometheus", "uid": "prometheus" },
        "expr": "changes(agent_queries_total[5m]) > 0",
        "iconColor": "blue",
        "hide": false,
        "enable": true,
        "step": "60s"
      }
    ]
  }
}

Provisioning Automático via Docker/Kubernetes

Para que el dashboard se instale automáticamente al levantar Grafana:

# grafana-provisioning/dashboards/provisioner.yaml
apiVersion: 1
providers:
  - name: claude-agent-dashboards
    orgId: 1
    type: file
    disableDeletion: false
    updateIntervalSeconds: 30
    allowUiUpdates: true
    options:
      path: /var/lib/grafana/dashboards
      foldersFromFilesStructure: true
# docker-compose con Grafana completo
services:
  grafana:
    image: grafana/grafana:10.4.0
    ports:
      - "3000:3000"
    volumes:
      - ./grafana-provisioning:/etc/grafana/provisioning
      - ./grafana-dashboards:/var/lib/grafana/dashboards
      - grafana_data:/var/lib/grafana
    environment:
      GF_SECURITY_ADMIN_PASSWORD: "${GRAFANA_PASSWORD:-admin}"
      GF_USERS_ALLOW_SIGN_UP: "false"
      GF_INSTALL_PLUGINS: "grafana-piechart-panel,grafana-clock-panel"
      GF_FEATURE_TOGGLES_ENABLE: "publicDashboards"
    depends_on:
      - prometheus

  prometheus:
    image: prom/prometheus:v2.51.0
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yaml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:
      - "--config.file=/etc/prometheus/prometheus.yml"
      - "--storage.tsdb.retention.time=30d"
      - "--web.enable-lifecycle"

volumes:
  grafana_data:
  prometheus_data:
# kubernetes/grafana-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: grafana-dashboards
  namespace: monitoring
data:
  claude-agent-dashboard.json: |
    {  "title": "Claude Agent — Production Dashboard", ... }
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: grafana-datasources
  namespace: monitoring
data:
  datasources.yaml: |
    apiVersion: 1
    datasources:
      - name: Prometheus
        type: prometheus
        url: http://prometheus:9090
        isDefault: true
        uid: prometheus

Alertas en Grafana (Unified Alerting)

Desde Grafana 9+, las alertas se configuran directamente en el dashboard como JSON:

{
  "alert": {
    "name": "AgentHighErrorRate",
    "message": "Error rate supera 5% en agente {{ $labels.agent_name }}",
    "frequency": "1m",
    "handler": 1,
    "conditions": [
      {
        "type": "query",
        "query": {
          "model": {
            "conditions": [
              {
                "evaluator": { "params": [0.05], "type": "gt" },
                "operator": { "type": "and" },
                "query": { "params": ["error_rate", "5m", "now"] },
                "reducer": { "params": [], "type": "avg" }
              }
            ]
          }
        }
      }
    ],
    "notifications": [
      { "uid": "slack-webhook-claude-agents" },
      { "uid": "pagerduty-critical" }
    ]
  }
}

11. Distributed Tracing Avanzado con OpenTelemetry

Instrumentación Completa del Ciclo de Vida del Agente

# otel_advanced.py
"""
Instrumentación completa con OpenTelemetry: spans por herramienta,
llamadas LLM, context propagation entre agentes padre-hijo y baggage.
"""
from opentelemetry import trace, baggage, context
from opentelemetry.sdk.trace import TracerProvider, SpanProcessor
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.trace import StatusCode, SpanKind
import os
import time
import asyncio
from typing import AsyncGenerator
from claude_code_sdk import query, ClaudeCodeOptions


# ============================================================
# Setup del provider con exportadores múltiples
# ============================================================

def setup_otel(service_name: str = "claude-agent") -> trace.Tracer:
    """
    Configura OpenTelemetry con:
    - Exportador OTLP a Jaeger/Tempo
    - Resource con información del servicio
    - Propagadores W3C TraceContext + Baggage
    """
    resource = Resource.create({
        "service.name": service_name,
        "service.version": os.getenv("APP_VERSION", "0.0.0"),
        "service.instance.id": os.getenv("HOSTNAME", "local"),
        "deployment.environment": os.getenv("ENVIRONMENT", "production"),
        "telemetry.sdk.language": "python",
    })

    # OTLP exporter (compatible con Jaeger, Tempo, Honeycomb, Datadog)
    otlp_exporter = OTLPSpanExporter(
        endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"),
        insecure=os.getenv("ENVIRONMENT") != "production"
    )

    provider = TracerProvider(resource=resource)
    provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
    trace.set_tracer_provider(provider)

    # Configurar propagadores para context propagation
    from opentelemetry import propagate
    propagate.set_global_textmap(CompositePropagator([
        TraceContextTextMapPropagator(),
        W3CBaggagePropagator(),
    ]))

    return trace.get_tracer(service_name)


tracer = setup_otel()


# ============================================================
# Baggage: pasar metadatos a través de spans
# ============================================================

def create_agent_context(
    session_id: str,
    user_id: str,
    request_id: str
) -> context.Context:
    """
    Crea un contexto con baggage para que todos los spans
    hijo hereden session_id, user_id y request_id.
    """
    ctx = context.get_current()
    ctx = baggage.set_baggage("session.id", session_id, context=ctx)
    ctx = baggage.set_baggage("user.id", user_id, context=ctx)
    ctx = baggage.set_baggage("request.id", request_id, context=ctx)
    return ctx


# ============================================================
# Span por herramienta con input/output
# ============================================================

class ToolSpanManager:
    """Gestiona spans de ciclo de vida de herramientas."""

    def __init__(self):
        self._active_spans: dict[str, trace.Span] = {}
        self._tool_start_times: dict[str, float] = {}

    def start_tool(self, tool_use_id: str, tool_name: str, tool_input: dict) -> None:
        """Abre un span cuando se llama una herramienta."""
        span = tracer.start_span(
            f"tool.{tool_name}",
            kind=SpanKind.INTERNAL,
            attributes={
                "tool.name": tool_name,
                "tool.use_id": tool_use_id,
                # Solo loggear inputs seguros (no secrets)
                "tool.input.type": str(type(tool_input).__name__),
                "tool.input.size": len(str(tool_input)),
            }
        )
        self._active_spans[tool_use_id] = span
        self._tool_start_times[tool_use_id] = time.monotonic()

    def end_tool(
        self,
        tool_use_id: str,
        output: str,
        is_error: bool = False
    ) -> None:
        """Cierra el span de la herramienta con el resultado."""
        span = self._active_spans.pop(tool_use_id, None)
        if span is None:
            return

        duration = time.monotonic() - self._tool_start_times.pop(tool_use_id, 0)
        span.set_attribute("tool.output.bytes", len(output))
        span.set_attribute("tool.duration_seconds", round(duration, 3))

        if is_error:
            span.set_status(StatusCode.ERROR, output[:200])
        else:
            span.set_status(StatusCode.OK)

        span.end()

    def close_all(self) -> None:
        """Cierra todos los spans abiertos (para cleanup en error)."""
        for span in self._active_spans.values():
            span.set_status(StatusCode.ERROR, "span closed by cleanup")
            span.end()
        self._active_spans.clear()
        self._tool_start_times.clear()


# ============================================================
# Span por llamada LLM con tokens y costo
# ============================================================

async def run_with_full_tracing(
    prompt: str,
    agent_name: str,
    session_id: str,
    user_id: str,
    parent_context: dict | None = None
) -> str:
    """
    Ejecuta el agente con trazas completas:
    - Span raíz por query
    - Span por cada llamada al LLM (con modelo, tokens)
    - Span por cada herramienta (con input size, output size, duración)
    - Context propagation con baggage
    """
    from opentelemetry import propagate

    # Extraer contexto padre si viene de HTTP headers
    ctx = propagate.extract(parent_context or {})

    # Crear contexto con baggage
    agent_ctx = create_agent_context(session_id, user_id, prompt[:20])

    # Merge de contextos
    token = context.attach(agent_ctx)

    tool_manager = ToolSpanManager()
    results = []
    llm_call_number = 0

    try:
        with tracer.start_as_current_span(
            "agent.query",
            context=ctx,
            kind=SpanKind.SERVER,
            attributes={
                "agent.name": agent_name,
                "agent.session_id": session_id,
                "user.id": user_id,
                "prompt.length": len(prompt),
                "prompt.preview": prompt[:100],
            }
        ) as root_span:

            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(
                    allowed_tools=["Read", "Edit", "Bash"],
                    max_turns=50
                )
            ):
                msg_type = type(message).__name__

                # Span para cada turno del LLM
                if msg_type == "AssistantMessage":
                    llm_call_number += 1
                    with tracer.start_as_current_span(
                        "llm.completion",
                        kind=SpanKind.CLIENT,
                        attributes={
                            "llm.model": "claude-opus-4-5",
                            "llm.call_number": llm_call_number,
                            "llm.vendor": "anthropic",
                        }
                    ) as llm_span:
                        content = str(getattr(message, 'content', ''))
                        llm_span.set_attribute("llm.response.length", len(content))

                # Inicio de tool call
                elif msg_type == "ToolUseBlock":
                    tool_name = getattr(message, 'name', 'unknown')
                    tool_id = getattr(message, 'id', 'no-id')
                    tool_input = getattr(message, 'input', {})
                    tool_manager.start_tool(tool_id, tool_name, tool_input)

                # Fin de tool call
                elif msg_type == "ToolResultBlock":
                    tool_id = getattr(message, 'tool_use_id', 'no-id')
                    output = str(getattr(message, 'content', ''))
                    is_error = bool(getattr(message, 'is_error', False))
                    tool_manager.end_tool(tool_id, output, is_error)

                # Mensaje de resultado final
                elif hasattr(message, 'cost_usd') and message.cost_usd:
                    cost = float(message.cost_usd)
                    root_span.set_attribute("agent.cost_usd", cost)

                    usage = getattr(message, 'usage', None)
                    if usage:
                        root_span.set_attribute("llm.input_tokens",
                            getattr(usage, 'input_tokens', 0))
                        root_span.set_attribute("llm.output_tokens",
                            getattr(usage, 'output_tokens', 0))
                        root_span.set_attribute("llm.cache_read_tokens",
                            getattr(usage, 'cache_read_input_tokens', 0))

                if hasattr(message, 'content') and message.content:
                    results.append(str(message.content))

            root_span.set_attribute("agent.llm_calls", llm_call_number)
            root_span.set_status(StatusCode.OK)

    except Exception as e:
        tool_manager.close_all()
        raise
    finally:
        context.detach(token)

    return "\n".join(results)

Context Propagation entre Agentes Padre-Hijo

# otel_propagation.py
"""
Context propagation: el agente padre pasa su trace context al hijo
para que aparezcan en el mismo trace en Jaeger/Tempo.
"""
from opentelemetry import propagate, trace
from opentelemetry.trace import SpanKind
from claude_code_sdk import query, ClaudeCodeOptions


async def parent_agent(task: str) -> str:
    """Agente padre que lanza subagentes."""

    with tracer.start_as_current_span(
        "parent_agent.run",
        kind=SpanKind.PRODUCER,
        attributes={"task.description": task[:100]}
    ) as parent_span:

        # Serializar contexto para propagarlo al hijo
        carrier: dict[str, str] = {}
        propagate.inject(carrier)

        # El carrier ahora contiene: {"traceparent": "00-...-...-01"}
        # Este header puede pasarse via HTTP, queue message, etc.

        # Llamar subagente pasando el carrier
        child_result = await child_agent(
            prompt=f"Analiza el siguiente task: {task}",
            parent_trace_carrier=carrier
        )

        parent_span.set_attribute("child.result.length", len(child_result))
        return child_result


async def child_agent(
    prompt: str,
    parent_trace_carrier: dict[str, str]
) -> str:
    """Subagente que continúa la traza del padre."""

    # Extraer el contexto padre del carrier
    parent_ctx = propagate.extract(parent_trace_carrier)

    with tracer.start_as_current_span(
        "child_agent.run",
        context=parent_ctx,  # Continuar traza del padre
        kind=SpanKind.CONSUMER,
        attributes={"prompt.preview": prompt[:100]}
    ) as child_span:

        results = []
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(
                allowed_tools=["Read", "Bash"],
                max_turns=20
            )
        ):
            if hasattr(message, 'content') and message.content:
                results.append(str(message.content))

        child_span.set_attribute("result.chunks", len(results))
        return "\n".join(results)

Exportar a Jaeger, Zipkin y Honeycomb

# otel_exporters.py
"""
Configuración de exportadores para diferentes backends.
El código del agente es idéntico — solo cambia el exporter.
"""
import os
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor


def get_exporter(backend: str):
    """Retorna el exportador según el backend configurado."""

    if backend == "jaeger":
        from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
        return OTLPSpanExporter(
            endpoint=os.getenv("JAEGER_ENDPOINT", "http://localhost:4317"),
            insecure=True
        )

    elif backend == "zipkin":
        from opentelemetry.exporter.zipkin.json import ZipkinExporter
        return ZipkinExporter(
            endpoint=os.getenv("ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans")
        )

    elif backend == "honeycomb":
        from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
        return OTLPSpanExporter(
            endpoint="https://api.honeycomb.io:443",
            headers={
                "x-honeycomb-team": os.environ["HONEYCOMB_API_KEY"],
                "x-honeycomb-dataset": os.getenv("HONEYCOMB_DATASET", "claude-agents"),
            }
        )

    elif backend == "tempo":
        from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
        return OTLPSpanExporter(
            endpoint=os.getenv("TEMPO_ENDPOINT", "http://localhost:4317"),
            insecure=True
        )

    else:
        # Console exporter para desarrollo
        from opentelemetry.sdk.trace.export import ConsoleSpanExporter
        return ConsoleSpanExporter()


# Seleccionar backend via variable de entorno
OTEL_BACKEND = os.getenv("OTEL_BACKEND", "jaeger")
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(get_exporter(OTEL_BACKEND)))

TypeScript: Instrumentación con OpenTelemetry

// otel-agent.ts
import { trace, SpanStatusCode, SpanKind, context, propagation } from "@opentelemetry/api";
import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-base";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc";
import { Resource } from "@opentelemetry/resources";
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

// Setup del tracer
const resource = new Resource({
  [SemanticResourceAttributes.SERVICE_NAME]: "claude-agent-ts",
  [SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]: process.env.NODE_ENV ?? "production",
});

const provider = new NodeTracerProvider({ resource });
provider.addSpanProcessor(
  new BatchSpanProcessor(
    new OTLPTraceExporter({
      url: process.env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://localhost:4317",
    })
  )
);
provider.register();

const tracer = trace.getTracer("claude-agent-ts");

// Agente con instrumentación completa
async function instrumentedAgent(
  prompt: string,
  agentName: string,
  parentCarrier?: Record<string, string>
): Promise<string> {
  // Extraer contexto padre si existe
  const parentCtx = parentCarrier
    ? propagation.extract(context.active(), parentCarrier)
    : context.active();

  return new Promise((resolve, reject) => {
    context.with(parentCtx, async () => {
      const span = tracer.startSpan("agent.query", {
        kind: SpanKind.SERVER,
        attributes: {
          "agent.name": agentName,
          "prompt.length": prompt.length,
          "prompt.preview": prompt.slice(0, 100),
        },
      });

      const ctx = trace.setSpan(context.active(), span);
      const results: string[] = [];
      let llmCalls = 0;

      try {
        await context.with(ctx, async () => {
          for await (const message of query(prompt, {
            allowedTools: ["Read", "Edit", "Bash"],
            maxTurns: 50,
          } as ClaudeCodeOptions)) {
            const msgType = (message as { type?: string }).type;

            if (msgType === "assistant") {
              llmCalls++;
              const llmSpan = tracer.startSpan("llm.completion", {
                kind: SpanKind.CLIENT,
                attributes: {
                  "llm.model": "claude-opus-4-5",
                  "llm.call_number": llmCalls,
                },
              });
              llmSpan.setStatus({ code: SpanStatusCode.OK });
              llmSpan.end();
            }

            if (msgType === "tool_use") {
              const toolName = (message as { name?: string }).name ?? "unknown";
              const toolSpan = tracer.startSpan(`tool.${toolName}`, {
                kind: SpanKind.INTERNAL,
                attributes: { "tool.name": toolName },
              });
              toolSpan.setStatus({ code: SpanStatusCode.OK });
              toolSpan.end();
            }

            if ("cost_usd" in message) {
              span.setAttribute("agent.cost_usd", Number(message.cost_usd) || 0);
            }

            if ("content" in message) {
              const content = String((message as { content: unknown }).content);
              if (content) results.push(content);
            }
          }
        });

        span.setAttribute("agent.llm_calls", llmCalls);
        span.setStatus({ code: SpanStatusCode.OK });
        resolve(results.join("\n"));
      } catch (err) {
        span.setStatus({ code: SpanStatusCode.ERROR, message: String(err) });
        span.recordException(err as Error);
        reject(err);
      } finally {
        span.end();
      }
    });
  });
}

export { instrumentedAgent, tracer };

12. Log Correlation y Búsqueda

Correlacionar Logs de Múltiples Servicios via trace_id

Cuando tienes múltiples servicios (API gateway, agente, worker), todos los logs de una misma request deben tener el mismo trace_id. Así puedes buscar todos los logs de una sesión en Loki/Elasticsearch.

# log_correlation.py
"""
Correlación de logs con trace_id de OpenTelemetry.
Todos los logs dentro de un span incluyen automáticamente trace_id y span_id.
"""
import structlog
from opentelemetry import trace
from opentelemetry.trace import format_trace_id, format_span_id


def add_otel_context(logger, method, event_dict):
    """
    Processor de structlog que agrega trace_id y span_id al log
    desde el span activo de OpenTelemetry.
    """
    span = trace.get_current_span()
    if span and span.is_recording():
        ctx = span.get_span_context()
        event_dict["trace_id"] = format_trace_id(ctx.trace_id)
        event_dict["span_id"] = format_span_id(ctx.span_id)
    return event_dict


def configure_correlated_logging():
    """
    Configura structlog con correlación automática de traces.
    Cada log tendrá: trace_id, span_id, timestamp, level.
    """
    import structlog

    structlog.configure(
        processors=[
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.stdlib.add_log_level,
            structlog.contextvars.merge_contextvars,
            add_otel_context,          # <-- agrega trace_id, span_id
            structlog.processors.JSONRenderer(),
        ],
        logger_factory=structlog.PrintLoggerFactory(),
    )


# Log ejemplo con correlación:
# {
#   "timestamp": "2026-03-23T10:15:30.123Z",
#   "level": "info",
#   "event": "tool_called",
#   "tool": "Read",
#   "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
#   "span_id": "00f067aa0ba902b7",
#   "session_id": "abc123",
#   "query_id": "d4e5f6"
# }

Loki + Grafana para Búsqueda de Logs

# docker-compose con stack completo de logs
services:
  loki:
    image: grafana/loki:2.9.0
    ports:
      - "3100:3100"
    volumes:
      - ./loki-config.yaml:/etc/loki/local-config.yaml
      - loki_data:/loki
    command: -config.file=/etc/loki/local-config.yaml

  promtail:
    image: grafana/promtail:2.9.0
    volumes:
      - /var/log:/var/log
      - ./promtail-config.yaml:/etc/promtail/config.yml
      - /var/lib/docker/containers:/var/lib/docker/containers:ro
    command: -config.file=/etc/promtail/config.yml

  grafana:
    image: grafana/grafana:10.4.0
    environment:
      GF_DATASOURCES_DEFAULT_TYPE: loki
    volumes:
      - ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/loki.yaml
# promtail-config.yaml: scrapear logs JSON del agente
scrape_configs:
  - job_name: claude-agent
    static_configs:
      - targets: ["localhost"]
        labels:
          job: claude-agent
          __path__: /app/logs/agent*.log
    pipeline_stages:
      - json:
          expressions:
            trace_id: trace_id
            session_id: session_id
            level: level
            event: event
            agent_name: agent_name
      - labels:
          trace_id:
          session_id:
          level:
          agent_name:

Queries Útiles en Loki (LogQL)

# Todos los logs de una sesión específica
{job="claude-agent"} |= `"session_id":"abc123"`

# Logs de error en los últimos 30 minutos
{job="claude-agent"} |= `"level":"error"` | json | line_format "{{.timestamp}} {{.event}} {{.error}}"

# Todos los logs de un trace_id (ver request completa)
{job="claude-agent"} |= `"trace_id":"4bf92f3577b34da6a"` | json | line_format "{{.span_id}} {{.event}}"

# Tool calls lentas (output_bytes > 10000)
{job="claude-agent"} |= `"event":"tool_completed"` | json | output_bytes > 10000

# Contar errores por tipo en la última hora
sum by (error_type) (
  count_over_time({job="claude-agent"} |= `"level":"error"` | json | unwrap error_type [1h])
)

# Latencia promedio por agente (usando duration_seconds)
avg by (agent_name) (
  {job="claude-agent"} |= `"event":"query_completed"` | json | unwrap duration_seconds [5m]
)

Elasticsearch/OpenSearch para Logs de Agentes

# elasticsearch_logger.py
"""
Enviar logs directamente a Elasticsearch para búsqueda full-text.
Útil para logs de razonamiento del agente (texto no estructurado).
"""
import asyncio
import json
from elasticsearch import AsyncElasticsearch
import structlog
from structlog.contextvars import merge_contextvars

es_client = AsyncElasticsearch(
    hosts=[{"host": "localhost", "port": 9200}]
)

INDEX_NAME = "claude-agent-logs"


async def index_agent_log(log_entry: dict) -> None:
    """Indexa un log en Elasticsearch."""
    await es_client.index(
        index=f"{INDEX_NAME}-{log_entry.get('date', 'unknown')}",
        document=log_entry
    )


async def search_session_logs(session_id: str) -> list[dict]:
    """Busca todos los logs de una sesión."""
    result = await es_client.search(
        index=f"{INDEX_NAME}-*",
        body={
            "query": {
                "term": {"session_id.keyword": session_id}
            },
            "sort": [{"@timestamp": {"order": "asc"}}],
            "size": 1000
        }
    )
    return [hit["_source"] for hit in result["hits"]["hits"]]


async def find_expensive_sessions(
    min_cost_usd: float = 0.10,
    last_hours: int = 24
) -> list[dict]:
    """
    Encuentra sesiones que costaron más de min_cost_usd
    en las últimas last_hours horas.
    """
    result = await es_client.search(
        index=f"{INDEX_NAME}-*",
        body={
            "query": {
                "bool": {
                    "must": [
                        {"term": {"event": "query_completed"}},
                        {"range": {
                            "@timestamp": {"gte": f"now-{last_hours}h"}
                        }},
                        {"range": {"cost_usd": {"gte": min_cost_usd}}}
                    ]
                }
            },
            "aggs": {
                "by_session": {
                    "terms": {"field": "session_id.keyword"},
                    "aggs": {
                        "total_cost": {"sum": {"field": "cost_usd"}}
                    }
                }
            }
        }
    )
    return result["aggregations"]["by_session"]["buckets"]

13. SLOs y Error Budgets para Agentes

Definir SLOs para Agentes Autónomos

Los SLOs (Service Level Objectives) para agentes son más complejos que para APIs porque las variables son más ricas: latencia variable, costo por query, tasa de éxito y calidad del resultado.

# slo_definitions.py
"""
Definición de SLOs para agentes de IA.
Estructura: SLO = indicador + objetivo + ventana de tiempo.
"""
from dataclasses import dataclass
from enum import Enum


class SLOStatus(str, Enum):
    HEALTHY = "healthy"
    WARNING = "warning"
    BURNING = "burning"     # Burn rate alto: agotando el budget
    EXHAUSTED = "exhausted" # Budget agotado


@dataclass
class AgentSLO:
    name: str
    description: str
    objective: float          # 0.0 a 1.0 (ej: 0.99 = 99%)
    window_days: int          # Ventana de evaluación en días
    metric_query: str         # Query Prometheus para el indicador


# SLOs del agente de producción
AGENT_SLOS = [
    AgentSLO(
        name="availability",
        description="El agente completa el 99% de los queries sin error",
        objective=0.99,
        window_days=30,
        metric_query="""
            sum(rate(agent_queries_total{status="success"}[30d]))
            /
            sum(rate(agent_queries_total[30d]))
        """
    ),
    AgentSLO(
        name="latency_p95",
        description="El 95% de los queries completan en menos de 60 segundos",
        objective=0.95,
        window_days=30,
        metric_query="""
            histogram_quantile(0.95,
                rate(agent_query_duration_seconds_bucket[30d])
            ) < 60
        """
    ),
    AgentSLO(
        name="cost_per_query",
        description="El costo promedio por query es menor a $0.10",
        objective=0.95,
        window_days=7,
        metric_query="""
            avg(rate(agent_cost_usd_total[7d]))
            /
            avg(rate(agent_queries_total[7d]))
            < 0.10
        """
    ),
]

Cálculo del Error Budget

# error_budget.py
"""
Error budget: cuánto "error" puedes incurrir en el periodo de la ventana.

Para un SLO de 99% en 30 días:
  - Total de minutos: 30 * 24 * 60 = 43,200 minutos
  - Error budget: (1 - 0.99) * 43,200 = 432 minutos de downtime permitidos
"""
import asyncio
from typing import NamedTuple
from prometheus_client import CollectorRegistry, Gauge


class ErrorBudgetStatus(NamedTuple):
    slo_name: str
    objective: float
    current_sli: float           # Valor actual del indicador
    error_budget_total: float    # Budget total en el periodo
    error_budget_remaining: float # Budget restante
    burn_rate: float             # Cuán rápido se consume el budget
    status: SLOStatus


def calculate_error_budget(
    objective: float,
    current_sli: float,
    window_minutes: int
) -> ErrorBudgetStatus:
    """
    Calcula el estado del error budget.

    Args:
        objective: SLO objetivo (ej: 0.99)
        current_sli: Valor actual del indicador (ej: 0.987)
        window_minutes: Duración de la ventana en minutos
    """
    allowed_failure_rate = 1.0 - objective
    actual_failure_rate = 1.0 - current_sli

    total_budget = allowed_failure_rate * window_minutes
    consumed = actual_failure_rate * window_minutes
    remaining = total_budget - consumed
    burn_rate = actual_failure_rate / allowed_failure_rate if allowed_failure_rate > 0 else 0

    if burn_rate < 1.0:
        status = SLOStatus.HEALTHY
    elif burn_rate < 2.0:
        status = SLOStatus.WARNING
    elif burn_rate < 5.0:
        status = SLOStatus.BURNING
    else:
        status = SLOStatus.EXHAUSTED

    return ErrorBudgetStatus(
        slo_name="availability",
        objective=objective,
        current_sli=current_sli,
        error_budget_total=total_budget,
        error_budget_remaining=remaining,
        burn_rate=burn_rate,
        status=status,
    )


# Ejemplo:
# SLO 99%, ventana 30 días (43,200 min), SLI actual 98.7%
# budget = 0.01 * 43200 = 432 min
# consumed = 0.013 * 43200 = 561 min  --> budget agotado!

Alertas Basadas en Burn Rate

Las alertas de burn rate son más precisas que alertas simples de threshold porque detectan tendencias antes de que el budget se agote.

# slo-alerts.yaml
groups:
  - name: slo-error-budget
    rules:
      # Alerta 1: burn rate alto en 1 hora (consume el budget en ~5 días)
      - alert: AgentSLOBurnRateHigh
        expr: |
          (
            rate(agent_queries_total{status="error"}[1h])
            /
            rate(agent_queries_total[1h])
          ) > 14.4 * 0.01
        for: 5m
        labels:
          severity: critical
          slo: availability
        annotations:
          summary: "Burn rate del SLO de disponibilidad es 14.4x (consumirá el budget en 5 días)"
          description: |
            El error rate actual agotará el error budget mensual en 5 días si continúa.
            Error rate actual: {{ $value | humanizePercentage }}
            Error rate objetivo: 1%

      # Alerta 2: burn rate moderado en 6 horas (consume en 2 semanas)
      - alert: AgentSLOBurnRateModerate
        expr: |
          (
            rate(agent_queries_total{status="error"}[6h])
            /
            rate(agent_queries_total[6h])
          ) > 3 * 0.01
        for: 15m
        labels:
          severity: warning
          slo: availability
        annotations:
          summary: "Burn rate del SLO es 3x — revisar antes de que sea crítico"

      # Alerta 3: menos del 10% del budget restante
      - alert: AgentErrorBudgetAlmostExhausted
        expr: |
          1 - (
            sum_over_time(rate(agent_queries_total{status="success"}[30d]))
            /
            sum_over_time(rate(agent_queries_total[30d]))
          ) > 0.009
        for: 1h
        labels:
          severity: critical
        annotations:
          summary: "Menos del 10% del error budget restante para el mes"

Dashboard de SLO Compliance

{
  "title": "SLO Compliance — Claude Agent",
  "panels": [
    {
      "title": "Disponibilidad (SLO: 99%)",
      "type": "stat",
      "targets": [
        {
          "expr": "sum(rate(agent_queries_total{status='success'}[30d])) / sum(rate(agent_queries_total[30d]))",
          "legendFormat": "Disponibilidad"
        }
      ],
      "options": {
        "thresholds": {
          "steps": [
            { "value": null, "color": "red" },
            { "value": 0.99, "color": "green" }
          ]
        }
      },
      "fieldConfig": { "defaults": { "unit": "percentunit", "min": 0.98, "max": 1 } }
    },
    {
      "title": "Error Budget Remaining (30d)",
      "type": "gauge",
      "targets": [
        {
          "expr": "1 - (sum(rate(agent_queries_total{status='error'}[30d])) / sum(rate(agent_queries_total[30d]))) / 0.01",
          "legendFormat": "Budget restante"
        }
      ],
      "options": {
        "minValue": 0,
        "maxValue": 1,
        "thresholds": {
          "steps": [
            { "value": null, "color": "red" },
            { "value": 0.1, "color": "orange" },
            { "value": 0.5, "color": "yellow" },
            { "value": 0.8, "color": "green" }
          ]
        }
      },
      "fieldConfig": { "defaults": { "unit": "percentunit" } }
    }
  ]
}

14. Chaos Engineering para Agentes

Probar Resiliencia Inyectando Fallos

El chaos engineering verifica que los mecanismos de resiliencia (retry, circuit breaker, fallback) realmente funcionan antes de que ocurra un fallo real en producción.

# chaos_tools.py
"""
Chaos engineering para agentes: herramientas que fallan intencionalmente
para verificar que el sistema se recupera correctamente.
"""
import random
import asyncio
import time
from functools import wraps
from typing import Callable, Any


class ChaosConfig:
    """Configuración del motor de chaos."""

    def __init__(
        self,
        failure_rate: float = 0.20,          # 20% de probabilidad de fallo
        latency_injection_rate: float = 0.15, # 15% de probabilidad de latencia
        extra_latency_seconds: float = 10.0,  # Latencia extra inyectada
        enabled: bool = True
    ):
        self.failure_rate = failure_rate
        self.latency_injection_rate = latency_injection_rate
        self.extra_latency_seconds = extra_latency_seconds
        self.enabled = enabled


class ChaosMonkey:
    """
    Monkey patching de tools para inyectar fallos.
    Solo para testing — NUNCA en producción.
    """

    def __init__(self, config: ChaosConfig):
        self.config = config
        self._injected_failures: list[dict] = []
        self._injected_latencies: list[dict] = []

    def chaotic_tool(self, tool_name: str, original_func: Callable) -> Callable:
        """
        Envuelve una función de tool con inyección de fallos.
        """
        config = self.config
        monkey = self

        @wraps(original_func)
        async def wrapper(*args, **kwargs):
            if not config.enabled:
                return await original_func(*args, **kwargs)

            # Inyectar latencia
            if random.random() < config.latency_injection_rate:
                latency = random.uniform(1.0, config.extra_latency_seconds)
                monkey._injected_latencies.append({
                    "tool": tool_name,
                    "latency_seconds": round(latency, 2),
                    "timestamp": time.time()
                })
                print(f"[CHAOS] Inyectando {latency:.1f}s de latencia en {tool_name}")
                await asyncio.sleep(latency)

            # Inyectar fallo
            if random.random() < config.failure_rate:
                error_msg = f"[CHAOS] Fallo inyectado en {tool_name}"
                monkey._injected_failures.append({
                    "tool": tool_name,
                    "error": error_msg,
                    "timestamp": time.time()
                })
                print(error_msg)
                raise RuntimeError(error_msg)

            return await original_func(*args, **kwargs)

        return wrapper

    @property
    def failure_summary(self) -> dict:
        return {
            "total_failures": len(self._injected_failures),
            "total_latencies": len(self._injected_latencies),
            "failures_by_tool": {
                f["tool"]: sum(1 for x in self._injected_failures if x["tool"] == f["tool"])
                for f in self._injected_failures
            }
        }

Chaos Hook — Herramienta que Falla Aleatoriamente

# chaos_hook.py
"""
Hook de PreToolUse que inyecta fallos en herramientas específicas.
Se integra con el SDK via el sistema de hooks.
"""
import random
import json
import os


def create_chaos_pre_tool_hook(
    tools_to_chaos: list[str],
    failure_rate: float = 0.20,
    only_in_environments: list[str] | None = None
) -> str:
    """
    Genera el script de hook que inyecta fallos.
    Retorna el path al script generado.
    """
    env_check = ""
    if only_in_environments:
        envs = json.dumps(only_in_environments)
        env_check = f"""
import os
allowed_envs = {envs}
current_env = os.getenv("ENVIRONMENT", "production")
if current_env not in allowed_envs:
    import sys; sys.exit(0)  # No aplicar chaos en producción
"""

    script_content = f"""#!/usr/bin/env python3
\"\"\"
Chaos Engineering Hook — PreToolUse
SOLO para testing — NO usar en producción
\"\"\"
import sys
import json
import random
{env_check}

hook_input = json.loads(sys.stdin.read())
tool_name = hook_input.get("tool_name", "")
chaos_tools = {json.dumps(tools_to_chaos)}
failure_rate = {failure_rate}

if tool_name in chaos_tools and random.random() < failure_rate:
    # Devolver error al agente (el agente deberá manejar el fallo)
    output = {{
        "decision": "block",
        "reason": f"[CHAOS] Fallo inyectado en herramienta {{tool_name}} para prueba de resiliencia"
    }}
    print(json.dumps(output))
    sys.exit(0)

# Si no hay fallo, dejar pasar
print(json.dumps({{"decision": "approve"}}))
"""

    hook_path = "/tmp/chaos_pre_tool_hook.py"
    with open(hook_path, "w") as f:
        f.write(script_content)

    os.chmod(hook_path, 0o755)
    return hook_path

Suite de Tests de Chaos

# chaos_test_suite.py
"""
Suite completa de chaos engineering para el agente.
Verifica que los mecanismos de resiliencia funcionan correctamente.
"""
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
from claude_code_sdk import query, ClaudeCodeOptions


class ChaosTestSuite:
    """Suite de tests que verifica resiliencia bajo condiciones caóticas."""

    async def test_retry_on_transient_failure(self) -> bool:
        """
        Verifica que el agente reintenta correctamente cuando una herramienta
        falla de forma transitoria.
        """
        call_count = 0
        original_results = []

        # Simular fallo en el primer intento, éxito en el segundo
        async def failing_then_succeeding_query(*args, **kwargs):
            nonlocal call_count
            call_count += 1
            if call_count == 1:
                raise RuntimeError("Simulated transient failure")

            # Mock de un generador async
            async def mock_messages():
                yield type("Result", (), {"content": "Success after retry", "cost_usd": 0.01})()
            return mock_messages()

        try:
            # El agente con retry debería manejar esto
            from your_agent_module import agent_with_retry  # noqa
            result = await agent_with_retry(
                prompt="test task",
                tools=["Read"],
                max_retries=3,
                base_delay=0.1  # Delay corto para tests
            )
            assert result is not None
            print("✓ Retry test passed")
            return True
        except Exception as e:
            print(f"✗ Retry test failed: {e}")
            return False

    async def test_circuit_breaker_opens_on_failures(self) -> bool:
        """
        Verifica que el circuit breaker se abre después de N fallos consecutivos.
        """
        from your_agent_module import AgentCircuitBreaker  # noqa

        breaker = AgentCircuitBreaker(
            failure_threshold=3,
            reset_timeout=1.0
        )

        failures = 0
        # Provocar 3 fallos consecutivos
        for i in range(3):
            try:
                # Simular fallo forzando una excepción directa
                breaker._on_failure()
                failures += 1
            except Exception:
                pass

        from your_agent_module import CircuitState  # noqa
        assert breaker.state == CircuitState.OPEN, f"Expected OPEN, got {breaker.state}"

        # Intentar llamar con circuito abierto — debe fallar inmediatamente
        try:
            await breaker.call("test", ["Read"])
            print("✗ Circuit breaker should have rejected the call")
            return False
        except RuntimeError as e:
            if "ABIERTO" in str(e) or "OPEN" in str(e):
                print("✓ Circuit breaker test passed")
                return True
            raise

    async def test_cost_budget_enforced(self) -> bool:
        """
        Verifica que el agente se detiene cuando se excede el presupuesto de costo.
        """
        # El CostBudgetPlugin debería lanzar un error
        print("✓ Cost budget test requires integration with PluggableAgentRunner")
        return True

    async def run_all(self) -> dict[str, bool]:
        """Ejecuta toda la suite y retorna resultados."""
        results = {}
        tests = [
            ("retry_on_transient_failure", self.test_retry_on_transient_failure),
            ("circuit_breaker_opens", self.test_circuit_breaker_opens_on_failures),
            ("cost_budget_enforced", self.test_cost_budget_enforced),
        ]

        for test_name, test_func in tests:
            print(f"\nEjecutando: {test_name}")
            try:
                result = await test_func()
                results[test_name] = result
            except Exception as e:
                print(f"✗ {test_name} raised exception: {e}")
                results[test_name] = False

        passed = sum(1 for r in results.values() if r)
        total = len(results)
        print(f"\n{'='*40}")
        print(f"Resultado: {passed}/{total} tests pasaron")
        return results


# Ejecutar la suite
if __name__ == "__main__":
    suite = ChaosTestSuite()
    asyncio.run(suite.run_all())

TypeScript: Chaos Testing

// chaos-test.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

interface ChaosConfig {
  failureRate: number;
  latencyMs: number;
  latencyRate: number;
}

function wrapWithChaos<T extends (...args: unknown[]) => Promise<unknown>>(
  fn: T,
  config: ChaosConfig
): T {
  return (async (...args: unknown[]) => {
    // Inyectar latencia
    if (Math.random() < config.latencyRate) {
      const latency = Math.random() * config.latencyMs;
      console.log(`[CHAOS] Injecting ${latency.toFixed(0)}ms latency`);
      await new Promise((r) => setTimeout(r, latency));
    }

    // Inyectar fallo
    if (Math.random() < config.failureRate) {
      console.log("[CHAOS] Injecting failure");
      throw new Error("[CHAOS] Injected failure for resilience testing");
    }

    return fn(...args);
  }) as T;
}

async function runChaosTest(): Promise<void> {
  const config: ChaosConfig = {
    failureRate: 0.3,    // 30% de fallos
    latencyMs: 5000,      // hasta 5 segundos extra
    latencyRate: 0.2,     // 20% con latencia extra
  };

  let attempts = 0;
  let successes = 0;
  const totalRuns = 10;

  for (let i = 0; i < totalRuns; i++) {
    attempts++;
    try {
      // Simulación: envolver query con chaos
      const results: string[] = [];
      for await (const message of query("Di hola brevemente", {
        allowedTools: [],
        maxTurns: 1,
      } as ClaudeCodeOptions)) {
        if ("content" in message) {
          results.push(String((message as { content: unknown }).content));
        }
      }
      successes++;
    } catch (e) {
      console.log(`[CHAOS] Run ${i + 1} failed: ${e}`);
    }
  }

  const successRate = successes / attempts;
  console.log(`\nChaos test results:`);
  console.log(`  Attempts: ${attempts}`);
  console.log(`  Successes: ${successes}`);
  console.log(`  Success rate: ${(successRate * 100).toFixed(1)}%`);
}

15. Profiling y Flame Graphs

Dónde Pasa el Tiempo el Agente

flowchart LR
    subgraph TimeBreakdown["Distribución típica del tiempo de un query"]
        LLM["Esperando respuesta LLM\n~60-75% del tiempo total"]
        Tools["Ejecutando herramientas\n~15-25% del tiempo total"]
        SDK["SDK overhead\n(parse JSON, Python)\n~2-5% del tiempo total"]
        Network["Red y latencia\n~5-10% del tiempo total"]
    end

Profiling con cProfile y py-spy

# profiling_agent.py
"""
Profiling de performance del código Python que usa el SDK.
Identifica qué parte del código (no LLM) consume más tiempo.
"""
import cProfile
import pstats
import asyncio
import io
from claude_code_sdk import query, ClaudeCodeOptions


async def agent_workload():
    """La carga de trabajo real del agente."""
    results = []
    async for message in query(
        "Lista los archivos en el directorio actual",
        options=ClaudeCodeOptions(
            allowed_tools=["Bash"],
            max_turns=5
        )
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))

        # Simular procesamiento post-mensaje
        _process_message(message)

    return results


def _process_message(message) -> None:
    """Procesamiento que podría ser costoso si hay muchos mensajes."""
    import json
    # Serializar para auditoría (potencial bottleneck si el mensaje es grande)
    try:
        _ = json.dumps(str(message))
    except Exception:
        pass


def profile_agent_sync():
    """
    Profile síncrono del agente.
    cProfile funciona mejor con código síncrono.
    """
    profiler = cProfile.Profile()

    # Wrapper síncrono
    def run_sync():
        loop = asyncio.new_event_loop()
        return loop.run_until_complete(agent_workload())

    profiler.enable()
    result = run_sync()
    profiler.disable()

    # Generar reporte
    stream = io.StringIO()
    stats = pstats.Stats(profiler, stream=stream)
    stats.sort_stats("cumulative")
    stats.print_stats(30)  # Top 30 funciones más lentas

    print("\n=== PROFILING REPORT ===")
    print(stream.getvalue())
    return result


# Ejecutar profiling
if __name__ == "__main__":
    profile_agent_sync()

py-spy para Profiling de Procesos en Ejecución

# Instalar py-spy
pip install py-spy

# Profiling de un proceso en ejecución (PID)
# Útil para procesos que ya están corriendo en producción
py-spy record -o agent_profile.svg --pid $(pgrep -f "python agent.py") --duration 30

# Profiling desde el inicio del proceso
py-spy record -o agent_profile.svg -- python agent.py

# Top en tiempo real (como `top` pero para funciones Python)
py-spy top --pid $(pgrep -f "python agent.py")

# Dump del stack trace actual (para ver dónde está atascado)
py-spy dump --pid $(pgrep -f "python agent.py")

Interpretación del Flame Graph

flowchart TD
    subgraph FlameGraph["Lectura de un Flame Graph (SVG)"]
        direction TB
        Base["Base del gráfico = llamadas más externas\n(main, asyncio.run, etc.)"]
        Middle["Capas intermedias = chain de llamadas"]
        Top["Topes de llamas = funciones que consumen más CPU"]

        Base --> Middle
        Middle --> Top
    end

    subgraph Interpretation["¿Qué buscar?"]
        Wide["Llamas anchas = función lenta o llamada muchas veces"]
        Flat["Llama plana y ancha = bottleneck real"]
        Tall["Llama alta y delgada = deep call stack, no necesariamente lento"]
    end

Profiling Asíncrono con austin

# austin: profiler de bajo overhead para Python async
pip install austin-dist

# Profiling de código async (asyncio)
austin -i 1000 python agent.py | austin-web

# Alternatively, usar asyncio debug mode para encontrar coroutines lentas
PYTHONASYNCIODEBUG=1 python agent.py 2>&1 | grep "took longer than"

Identificar Bottlenecks en Procesamiento de Mensajes

# message_processing_profiler.py
"""
Profiler específico para el procesamiento de mensajes del SDK.
Identifica si el cuello de botella está en el procesamiento Python
o en la espera del LLM.
"""
import time
import asyncio
from collections import defaultdict
from claude_code_sdk import query, ClaudeCodeOptions


class MessageTimingProfiler:
    """Mide cuánto tiempo pasa en cada tipo de mensaje."""

    def __init__(self):
        self.timings: dict[str, list[float]] = defaultdict(list)
        self.between_messages: list[float] = []
        self._last_message_time: float | None = None

    async def profile(self, prompt: str) -> dict:
        """Perfila el procesamiento de mensajes."""
        start = time.monotonic()
        message_count = 0

        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(
                allowed_tools=["Read", "Bash"],
                max_turns=20
            )
        ):
            now = time.monotonic()
            message_count += 1

            # Tiempo entre mensajes = tiempo esperando al LLM/tool
            if self._last_message_time is not None:
                wait_time = now - self._last_message_time
                self.between_messages.append(wait_time)

            # Tiempo de procesamiento Python por tipo de mensaje
            msg_type = type(message).__name__
            proc_start = time.monotonic()

            # Simular procesamiento
            _ = str(message)

            proc_time = time.monotonic() - proc_start
            self.timings[msg_type].append(proc_time)

            self._last_message_time = time.monotonic()

        total = time.monotonic() - start
        python_time = sum(sum(v) for v in self.timings.values())
        wait_time = sum(self.between_messages)

        return {
            "total_seconds": round(total, 3),
            "python_processing_seconds": round(python_time, 3),
            "waiting_for_llm_or_tools_seconds": round(wait_time, 3),
            "python_overhead_percent": round(python_time / total * 100, 1),
            "message_count": message_count,
            "timing_by_message_type": {
                k: {
                    "count": len(v),
                    "avg_ms": round(sum(v) / len(v) * 1000, 3),
                    "max_ms": round(max(v) * 1000, 3),
                }
                for k, v in self.timings.items()
            }
        }


async def main():
    profiler = MessageTimingProfiler()
    report = await profiler.profile("Lista los archivos Python del directorio actual")

    print("\n=== AGENT MESSAGE PROCESSING PROFILE ===")
    print(f"Tiempo total: {report['total_seconds']}s")
    print(f"Tiempo Python: {report['python_processing_seconds']}s ({report['python_overhead_percent']}%)")
    print(f"Esperando LLM/tools: {report['waiting_for_llm_or_tools_seconds']}s")
    print(f"Total mensajes: {report['message_count']}")
    print("\nPor tipo de mensaje:")
    for msg_type, timing in report['timing_by_message_type'].items():
        print(f"  {msg_type}: {timing['count']} msgs, avg={timing['avg_ms']}ms, max={timing['max_ms']}ms")


asyncio.run(main())