Capítulo 6: Sistema de Hooks — Control Total del Agente

Por: Artiko
claudeagent-sdkhooksvalidacionauditoriaseguridadobservabilidad

Capítulo 6: Sistema de Hooks — Control Total del Agente

¿Qué son los hooks?

Los hooks son callbacks que el SDK llama en puntos específicos del ciclo de vida agentic. Son el mecanismo que te da control total sobre lo que hace el agente: puedes observar cada acción, modificar entradas y salidas, bloquear operaciones peligrosas, o disparar efectos secundarios como notificaciones y métricas.

El ciclo de vida del agente con hooks

flowchart TD
    START(["Inicio: query()"])
    PROMPT["LLM procesa el prompt"]
    RESP["LLM genera respuesta"]
    TOOL_CALL{"¿La respuesta\nincluye tool call?"}
    PRE["PreToolUse Hook\n(ANTES de ejecutar)"]
    DECISION{"¿El hook\npermite ejecutar?"}
    EXEC["Ejecutar herramienta"]
    POST["PostToolUse Hook\n(DESPUÉS de ejecutar)"]
    MODIFY{"¿El hook\nmodifica output?"}
    SEND_RESULT["Enviar resultado al LLM"]
    NOTIF["Notification Hook\n(mensajes al usuario)"]
    MAX_TURNS{"¿Máximo de\nturns alcanzado?"}
    STOP["Stop Hook\n(agente terminó)"]
    END(["Fin: ResultMessage"])

    START --> PROMPT
    PROMPT --> RESP
    RESP --> TOOL_CALL
    TOOL_CALL -->|"Sí"| PRE
    TOOL_CALL -->|"No (mensaje final)"| NOTIF
    PRE --> DECISION
    DECISION -->|"deny"| SEND_RESULT
    DECISION -->|"allow"| EXEC
    EXEC --> POST
    POST --> MODIFY
    MODIFY -->|"override output"| SEND_RESULT
    MODIFY -->|"original output"| SEND_RESULT
    SEND_RESULT --> PROMPT
    NOTIF --> MAX_TURNS
    MAX_TURNS -->|"Sí"| STOP
    MAX_TURNS -->|"No"| PROMPT
    STOP --> END

    style PRE fill:#FF9800,color:#000
    style POST fill:#4CAF50,color:#fff
    style STOP fill:#F44336,color:#fff
    style NOTIF fill:#2196F3,color:#fff

Lista completa de eventos de hook

EventoCuándo se ejecutaPuede modificar
PreToolUseAntes de ejecutar una herramientaInput de la herramienta, puede denegar
PostToolUseDespués de ejecutar una herramientaOutput de la herramienta
StopCuando el agente termina (éxito o max_turns)No (solo observación)
NotificationCuando el agente envía mensajes informativosPuede redirigir notificaciones

Qué retorna un hook

Cada hook retorna un diccionario que le indica al SDK cómo proceder:

# 1. Continuar sin cambios
return {}

# 2. Denegar la ejecución de la herramienta (PreToolUse)
return {
    "hookSpecificOutput": {
        "permissionDecision": "deny",
        "permissionDecisionReason": "No se permiten modificaciones a archivos .env"
    }
}

# 3. Modificar el input antes de ejecutar (PreToolUse)
return {
    "hookSpecificOutput": {
        "overrideToolUseInput": {
            "file_path": "/ruta/absoluta/corregida.py",
            "content": "# Contenido modificado por el hook"
        }
    }
}

# 4. Modificar el output después de ejecutar (PostToolUse)
return {
    "hookSpecificOutput": {
        "overrideToolUseOutput": {
            "type": "tool_result",
            "content": "Output modificado por el hook PostToolUse"
        }
    }
}

HookMatcher — Configuración

Un HookMatcher combina un patrón regex con una lista de hooks. El SDK evalúa el nombre de la herramienta contra el matcher y ejecuta los hooks correspondientes.

Signature completa

Python:

from claude_code_sdk import HookMatcher, PreToolUseHook, PostToolUseHook

HookMatcher(
    matcher="^(Write|Edit|MultiEdit)$",  # Regex para tool_name
    hooks=[                               # Lista de callbacks
        PreToolUseHook(mi_hook_pre),
        PostToolUseHook(mi_hook_post)
    ]
)

TypeScript:

import { HookMatcher, PreToolUseHook } from "@anthropic-ai/claude-code-sdk";

const matcher: HookMatcher = {
  matcher: "^(Write|Edit|MultiEdit)$",
  hooks: [
    { type: "PreToolUse", handler: miHookPre },
    { type: "PostToolUse", handler: miHookPost }
  ]
};

Patrones matcher comunes

from claude_code_sdk import HookMatcher, PreToolUseHook

# Todos los tools
HookMatcher(matcher=".*", hooks=[...])

# Solo herramientas de escritura de archivos
HookMatcher(matcher="^(Write|Edit|MultiEdit)$", hooks=[...])

# Solo herramientas de bash y comandos
HookMatcher(matcher="^(Bash|Computer)$", hooks=[...])

# Todas las herramientas MCP
HookMatcher(matcher="^mcp__.*$", hooks=[...])

# Un servidor MCP específico
HookMatcher(matcher="^mcp__postgres__.*$", hooks=[...])

# Una herramienta específica
HookMatcher(matcher="^Bash$", hooks=[...])

# Cualquier herramienta de lectura
HookMatcher(matcher="^(Read|Glob|Grep|LS)$", hooks=[...])

Múltiples matchers en un agente

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook

async def hook_escritura(event):
    """Bloquea escritura en paths sensibles."""
    tool_input = event.tool_input
    if "file_path" in tool_input:
        path = tool_input["file_path"]
        if ".env" in path or "secrets" in path.lower():
            return {
                "hookSpecificOutput": {
                    "permissionDecision": "deny",
                    "permissionDecisionReason": f"Escritura en '{path}' no permitida"
                }
            }
    return {}

async def hook_bash(event):
    """Bloquea comandos bash peligrosos."""
    comando = event.tool_input.get("command", "")
    if "rm -rf" in comando or "sudo" in comando:
        return {
            "hookSpecificOutput": {
                "permissionDecision": "deny",
                "permissionDecisionReason": f"Comando peligroso bloqueado: {comando[:50]}"
            }
        }
    return {}

async def hook_log_todo(event):
    """Registra todas las herramientas ejecutadas."""
    print(f"[LOG] Tool: {event.tool_name}")
    return {}

async def main():
    options = ClaudeCodeOptions(
        hooks=[
            # Hook 1: Proteger archivos sensibles (solo escrituras)
            HookMatcher(
                matcher="^(Write|Edit|MultiEdit)$",
                hooks=[PreToolUseHook(hook_escritura)]
            ),
            # Hook 2: Validar comandos bash
            HookMatcher(
                matcher="^Bash$",
                hooks=[PreToolUseHook(hook_bash)]
            ),
            # Hook 3: Log de todo (todos los tools)
            HookMatcher(
                matcher=".*",
                hooks=[PreToolUseHook(hook_log_todo)]
            )
        ]
    )

    async for event in query(
        prompt="Crea un archivo test.py y ejecuta pytest",
        options=options
    ):
        pass

asyncio.run(main())

TypeScript equivalente:

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const options: ClaudeCodeOptions = {
  hooks: [
    {
      matcher: "^(Write|Edit|MultiEdit)$",
      hooks: [{
        type: "PreToolUse",
        handler: async (event) => {
          const path = event.toolInput?.file_path as string;
          if (path?.includes(".env")) {
            return {
              hookSpecificOutput: {
                permissionDecision: "deny",
                permissionDecisionReason: "Escritura en .env no permitida"
              }
            };
          }
          return {};
        }
      }]
    },
    {
      matcher: "^Bash$",
      hooks: [{
        type: "PreToolUse",
        handler: async (event) => {
          const cmd = event.toolInput?.command as string;
          if (cmd?.includes("rm -rf")) {
            return {
              hookSpecificOutput: {
                permissionDecision: "deny",
                permissionDecisionReason: "rm -rf bloqueado por política"
              }
            };
          }
          return {};
        }
      }]
    }
  ]
};

PreToolUse — Interceptar antes de ejecutar

PreToolUse se ejecuta antes de que la herramienta corra. Es el punto donde puedes:

Estructura del evento PreToolUse

@dataclass
class PreToolUseEvent:
    tool_name: str        # Nombre de la herramienta: "Bash", "Write", "mcp__..."
    tool_input: dict      # Parámetros que usará la herramienta
    tool_use_id: str      # ID único de esta ejecución
    session_id: str       # ID de la sesión del agente

Ejemplo 1: Validar que no se editen archivos .env

import asyncio
import os
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook

ARCHIVOS_PROTEGIDOS = {
    ".env", ".env.local", ".env.production", ".env.development",
    "secrets.json", "credentials.json", "private.key",
    ".npmrc", ".pypirc"
}

async def proteger_archivos_sensibles(event) -> dict:
    """Bloquea escritura en archivos con secretos o configuración sensible."""
    tool_input = event.tool_input
    archivo = tool_input.get("file_path", "")

    if not archivo:
        return {}

    # Verificar nombre de archivo
    nombre_archivo = Path(archivo).name
    if nombre_archivo in ARCHIVOS_PROTEGIDOS:
        return {
            "hookSpecificOutput": {
                "permissionDecision": "deny",
                "permissionDecisionReason": (
                    f"BLOQUEADO: El archivo '{nombre_archivo}' está protegido. "
                    f"No se permite modificar archivos de credenciales o configuración sensible. "
                    f"Si necesitas cambiar variables de entorno, hazlo manualmente."
                )
            }
        }

    # Verificar extensiones peligrosas
    extension = Path(archivo).suffix.lower()
    if extension in {".key", ".pem", ".p12", ".pfx", ".cert"}:
        return {
            "hookSpecificOutput": {
                "permissionDecision": "deny",
                "permissionDecisionReason": (
                    f"BLOQUEADO: No se permite modificar archivos de certificados "
                    f"o claves privadas (extensión: {extension})"
                )
            }
        }

    # Verificar paths dentro de directorios de secretos
    partes_path = Path(archivo).parts
    directorios_prohibidos = {"secrets", "credentials", ".ssh", "certs", "private"}
    for parte in partes_path[:-1]:  # Excluir el nombre del archivo
        if parte.lower() in directorios_prohibidos:
            return {
                "hookSpecificOutput": {
                    "permissionDecision": "deny",
                    "permissionDecisionReason": (
                        f"BLOQUEADO: El directorio '{parte}' está protegido"
                    )
                }
            }

    return {}


async def main():
    options = ClaudeCodeOptions(
        hooks=[
            HookMatcher(
                matcher="^(Write|Edit|MultiEdit)$",
                hooks=[PreToolUseHook(proteger_archivos_sensibles)]
            )
        ]
    )

    async for event in query(
        prompt="Actualiza el archivo .env con la nueva API key: KEY=abc123",
        options=options
    ):
        if hasattr(event, 'type') and event.type == 'result':
            print(f"Resultado: {event.result}")

asyncio.run(main())

Ejemplo 2: Bloquear comandos bash peligrosos

import re
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook

# Patrones de comandos peligrosos con su razón
PATRONES_PELIGROSOS = [
    (r'\brm\s+-[rRfF]*\s*/', "rm recursivo en directorio raíz"),
    (r'\brm\s+-rf\b', "rm -rf (borrado recursivo forzado)"),
    (r'\bdd\s+if=', "comando dd (puede sobrescribir discos)"),
    (r'\b:(){ :|:& };:', "fork bomb"),
    (r'\bchmod\s+777\s+/', "chmod 777 en directorio raíz"),
    (r'\bsudo\s+rm', "sudo rm (borrado con privilegios elevados)"),
    (r'\bformat\s+[cCdD]:', "format de disco (Windows)"),
    (r'\bmkfs\b', "mkfs (formateo de sistema de archivos)"),
    (r'\bshred\b', "shred (borrado seguro irreversible)"),
    (r'\bwget\s+.*\s*\|\s*(?:bash|sh|zsh)', "descarga y ejecución de script remoto"),
    (r'\bcurl\s+.*\s*\|\s*(?:bash|sh|zsh)', "descarga y ejecución de script remoto"),
]

# Comandos en whitelist (siempre permitidos sin importar los patrones anteriores)
COMANDOS_SEGUROS_WHITELIST = {
    "ls", "cat", "echo", "pwd", "date", "whoami",
    "git status", "git log", "git diff"
}


async def validar_bash_seguro(event) -> dict:
    """Valida comandos bash contra lista de patrones peligrosos."""
    comando = event.tool_input.get("command", "").strip()

    if not comando:
        return {}

    # Verificar whitelist (permitir incondicionalmente)
    for cmd_seguro in COMANDOS_SEGUROS_WHITELIST:
        if comando.startswith(cmd_seguro):
            return {}

    # Verificar patrones peligrosos
    for patron, descripcion in PATRONES_PELIGROSOS:
        if re.search(patron, comando, re.IGNORECASE):
            return {
                "hookSpecificOutput": {
                    "permissionDecision": "deny",
                    "permissionDecisionReason": (
                        f"Comando bloqueado por política de seguridad: {descripcion}.\n"
                        f"Comando: {comando[:100]}"
                    )
                }
            }

    # Log de comandos que sí se ejecutan
    print(f"[BASH PERMITIDO] {comando[:80]}")
    return {}

TypeScript equivalente:

const patronesPeligrosos = [
  { patron: /\brm\s+-rf\b/, razon: "rm -rf bloqueado" },
  { patron: /\bsudo\s+rm/, razon: "sudo rm bloqueado" },
  { patron: /curl.*\|\s*(bash|sh)/, razon: "pipe a shell bloqueado" },
];

const hookBashSeguro = async (event: any) => {
  const comando: string = event.toolInput?.command ?? "";

  for (const { patron, razon } of patronesPeligrosos) {
    if (patron.test(comando)) {
      return {
        hookSpecificOutput: {
          permissionDecision: "deny",
          permissionDecisionReason: `Bloqueado: ${razon}. Comando: ${comando.slice(0, 80)}`
        }
      };
    }
  }

  return {};
};

Ejemplo 3: Transformar paths relativos a absolutos

import asyncio
import os
from pathlib import Path

async def normalizar_paths(event) -> dict:
    """Convierte paths relativos a absolutos antes de ejecutar herramientas de archivo."""
    tool_input = event.tool_input
    modificado = dict(tool_input)
    cambio_realizado = False

    directorio_trabajo = os.getcwd()

    for campo in ["file_path", "path", "source_path", "destination_path"]:
        if campo in tool_input and tool_input[campo]:
            path_original = tool_input[campo]
            path_obj = Path(path_original)

            if not path_obj.is_absolute():
                path_absoluto = str(Path(directorio_trabajo) / path_obj)
                modificado[campo] = path_absoluto
                cambio_realizado = True
                print(f"[PATH NORMALIZADO] {path_original}{path_absoluto}")

    if cambio_realizado:
        return {
            "hookSpecificOutput": {
                "overrideToolUseInput": modificado
            }
        }

    return {}

Ejemplo 4: Rate limiting por herramienta

import asyncio
import time
from collections import defaultdict
from typing import Dict, List

class RateLimiter:
    def __init__(self):
        # {tool_name: [timestamps de los últimos usos]}
        self._historial: Dict[str, List[float]] = defaultdict(list)

    # Límites por herramienta: (max_llamadas, ventana_segundos)
    LIMITES = {
        "Bash": (10, 60),          # Máximo 10 Bash por minuto
        "Write": (20, 60),          # Máximo 20 escrituras por minuto
        "mcp__postgres__.*": (50, 60),  # Máximo 50 queries DB por minuto
        ".*": (100, 60)             # Global: 100 herramientas por minuto
    }

    async def __call__(self, event) -> dict:
        tool_name = event.tool_name
        ahora = time.time()

        # Determinar límite aplicable
        max_llamadas, ventana = 100, 60  # Default
        for patron, (max_c, vent) in self.LIMITES.items():
            import re
            if re.match(patron, tool_name):
                max_llamadas, ventana = max_c, vent
                break

        # Limpiar historial antiguo
        self._historial[tool_name] = [
            t for t in self._historial[tool_name]
            if ahora - t < ventana
        ]

        # Verificar límite
        if len(self._historial[tool_name]) >= max_llamadas:
            tiempo_espera = ventana - (ahora - self._historial[tool_name][0])
            return {
                "hookSpecificOutput": {
                    "permissionDecision": "deny",
                    "permissionDecisionReason": (
                        f"Rate limit alcanzado para '{tool_name}': "
                        f"{max_llamadas} llamadas/{ventana}s. "
                        f"Espera {tiempo_espera:.0f}s."
                    )
                }
            }

        self._historial[tool_name].append(ahora)
        return {}


# Uso
rate_limiter = RateLimiter()

options = ClaudeCodeOptions(
    hooks=[
        HookMatcher(
            matcher=".*",
            hooks=[PreToolUseHook(rate_limiter)]
        )
    ]
)

PostToolUse — Interceptar después de ejecutar

PostToolUse se ejecuta después de que la herramienta termina, con acceso tanto al input como al output. Es el punto ideal para logging, verificaciones post-ejecución y transformación de resultados.

Estructura del evento PostToolUse

@dataclass
class PostToolUseEvent:
    tool_name: str      # Nombre de la herramienta
    tool_input: dict    # Los parámetros que se usaron
    tool_output: dict   # El resultado de la herramienta
    tool_use_id: str    # ID único de esta ejecución
    session_id: str     # ID de la sesión

Ejemplo 1: Log de archivos editados

import asyncio
import json
from datetime import datetime
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PostToolUseHook

class RegistroEdiciones:
    def __init__(self, archivo_log: str = "/tmp/ediciones_agente.jsonl"):
        self.archivo_log = archivo_log
        self._ediciones: list = []

    async def registrar_edicion(self, event) -> dict:
        """Registra en log cada archivo que el agente modifica."""
        tool_input = event.tool_input
        tool_output = event.tool_output
        tool_name = event.tool_name

        entrada_log = {
            "timestamp": datetime.now().isoformat(),
            "tool": tool_name,
            "session_id": event.session_id,
            "archivo": tool_input.get("file_path", "desconocido"),
            "exito": not tool_output.get("isError", False),
            "tool_use_id": event.tool_use_id
        }

        # Agregar detalles específicos por tipo de herramienta
        if tool_name == "Write":
            entrada_log["tipo"] = "crear/sobrescribir"
            entrada_log["lineas"] = tool_input.get("content", "").count("\n")
        elif tool_name in ("Edit", "MultiEdit"):
            entrada_log["tipo"] = "editar"

        self._ediciones.append(entrada_log)

        # Escribir en archivo JSONL
        with open(self.archivo_log, "a") as f:
            f.write(json.dumps(entrada_log, ensure_ascii=False) + "\n")

        print(f"[EDICIÓN REGISTRADA] {entrada_log['archivo']} ({tool_name})")
        return {}  # No modificar el output

    def resumen(self) -> str:
        if not self._ediciones:
            return "No se editaron archivos"
        archivos_unicos = {e["archivo"] for e in self._ediciones}
        return f"Archivos editados: {len(archivos_unicos)} únicos, {len(self._ediciones)} operaciones totales"

Ejemplo 2: Verificar que los tests pasan después de Edit

import asyncio
import subprocess
from pathlib import Path

async def verificar_tests_post_edit(event) -> dict:
    """Ejecuta tests automáticamente después de editar archivos Python."""
    tool_input = event.tool_input
    archivo = tool_input.get("file_path", "")

    # Solo para archivos Python que no sean tests
    if not archivo.endswith(".py") or "test_" in Path(archivo).name:
        return {}

    # Intentar encontrar y correr el archivo de test correspondiente
    ruta = Path(archivo)
    directorio = ruta.parent
    nombre_base = ruta.stem
    archivo_test = directorio / f"test_{nombre_base}.py"

    if not archivo_test.exists():
        return {}

    print(f"[AUTO-TEST] Ejecutando {archivo_test}...")
    resultado = subprocess.run(
        ["python", "-m", "pytest", str(archivo_test), "-v", "--tb=short"],
        capture_output=True,
        text=True,
        timeout=30
    )

    if resultado.returncode != 0:
        # Tests fallaron: informar al LLM
        mensaje_error = (
            f"ADVERTENCIA: Los tests fallaron después de editar {archivo}.\n"
            f"Salida de pytest:\n{resultado.stdout[-500:]}"  # últimas 500 chars
        )
        print(f"[AUTO-TEST FALLÓ] {mensaje_error}")

        return {
            "hookSpecificOutput": {
                "overrideToolUseOutput": {
                    "type": "tool_result",
                    "content": (
                        f"Archivo editado exitosamente, PERO los tests fallaron:\n"
                        f"{resultado.stdout[-500:]}"
                    )
                }
            }
        }

    print(f"[AUTO-TEST PASÓ] {archivo_test}")
    return {}

Ejemplo 3: Notificar por Slack cuando se completa una tarea

import asyncio
import httpx
import os
from datetime import datetime

async def notificar_slack_post_bash(event) -> dict:
    """Envía notificación a Slack cuando se ejecuta un comando Bash exitosamente."""
    tool_input = event.tool_input
    tool_output = event.tool_output
    comando = tool_input.get("command", "")

    # Solo notificar comandos de deploy o CI
    palabras_clave_deploy = ["deploy", "docker push", "kubectl apply", "terraform apply"]
    es_deploy = any(palabra in comando.lower() for palabra in palabras_clave_deploy)

    if not es_deploy:
        return {}

    exito = not tool_output.get("isError", False)
    emoji = "✅" if exito else "❌"
    estado = "exitoso" if exito else "FALLIDO"

    mensaje = {
        "text": f"{emoji} Deploy {estado}",
        "blocks": [
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*{emoji} Deploy {estado}*\n`{comando[:100]}`"
                }
            },
            {
                "type": "context",
                "elements": [
                    {
                        "type": "mrkdwn",
                        "text": f"Hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | Session: {event.session_id[:8]}"
                    }
                ]
            }
        ]
    }

    slack_webhook = os.environ.get("SLACK_WEBHOOK_URL")
    if slack_webhook:
        try:
            async with httpx.AsyncClient(timeout=5.0) as client:
                await client.post(slack_webhook, json=mensaje)
        except Exception as e:
            print(f"[SLACK] Error enviando notificación: {e}")

    return {}


async def main():
    options = ClaudeCodeOptions(
        hooks=[
            HookMatcher(
                matcher="^Bash$",
                hooks=[PostToolUseHook(notificar_slack_post_bash)]
            )
        ]
    )

    async for event in query(
        prompt="Ejecuta el deployment con: ./deploy.sh --zero-downtime",
        options=options
    ):
        pass

TypeScript equivalente:

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const notificarSlack = async (event: any) => {
  const comando: string = event.toolInput?.command ?? "";
  const esDeploy = ["deploy", "docker push", "kubectl"].some(k => comando.includes(k));

  if (!esDeploy) return {};

  const webhook = process.env.SLACK_WEBHOOK_URL;
  if (!webhook) return {};

  try {
    await fetch(webhook, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ text: `Deploy ejecutado: \`${comando.slice(0, 80)}\`` })
    });
  } catch (e) {
    console.error("Error Slack:", e);
  }

  return {};
};

const options: ClaudeCodeOptions = {
  hooks: [{
    matcher: "^Bash$",
    hooks: [{ type: "PostToolUse", handler: notificarSlack }]
  }]
};

Stop hook — Limpieza y reporte final

El Stop hook se ejecuta cuando el agente termina su trabajo, ya sea porque llegó a una respuesta final, alcanzó el máximo de turns, o fue interrumpido.

Casos de uso del Stop hook

import asyncio
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
from claude_code_sdk import query, ClaudeCodeOptions, StopHook

class ReporteFinal:
    def __init__(self, email_destino: str):
        self.email = email_destino
        self._inicio = datetime.now()
        self._acciones: list = []

    def registrar_accion(self, tool_name: str):
        self._acciones.append(tool_name)

    async def enviar_reporte(self, event) -> dict:
        """Envía reporte por email cuando el agente termina."""
        duracion = (datetime.now() - self._inicio).total_seconds()

        # Contar herramientas usadas
        from collections import Counter
        conteo = Counter(self._acciones)

        reporte = {
            "session_id": event.session_id,
            "duracion_segundos": duracion,
            "total_acciones": len(self._acciones),
            "herramientas_usadas": dict(conteo),
            "resultado": event.result if hasattr(event, 'result') else "N/A",
            "timestamp": datetime.now().isoformat()
        }

        print(f"\n{'='*50}")
        print(f"REPORTE FINAL DE SESIÓN")
        print(f"Duración: {duracion:.1f}s")
        print(f"Acciones: {len(self._acciones)}")
        print(f"Herramientas: {dict(conteo)}")
        print('='*50)

        # Guardar reporte en archivo
        with open(f"/tmp/reporte_{event.session_id[:8]}.json", "w") as f:
            json.dump(reporte, f, indent=2, ensure_ascii=False)

        return {}


async def main():
    reporte = ReporteFinal("[email protected]")

    async def hook_log_acciones(event):
        reporte.registrar_accion(event.tool_name)
        return {}

    options = ClaudeCodeOptions(
        hooks=[
            HookMatcher(
                matcher=".*",
                hooks=[PreToolUseHook(hook_log_acciones)]
            )
        ],
        on_stop=StopHook(reporte.enviar_reporte)
    )

    async for event in query(
        prompt="Crea un módulo Python completo con tests y documentación",
        options=options
    ):
        pass

Cleanup de recursos

import asyncio
from claude_code_sdk import ClaudeCodeOptions, StopHook

class GestorRecursos:
    def __init__(self):
        self._conexiones_db = []
        self._archivos_temp = []
        self._tareas_fondo = []

    def registrar_conexion(self, conn):
        self._conexiones_db.append(conn)

    def registrar_archivo_temp(self, path: str):
        self._archivos_temp.append(path)

    async def limpiar_todo(self, event) -> dict:
        """Limpia todos los recursos al terminar la sesión."""
        import os

        # Cerrar conexiones DB
        for conn in self._conexiones_db:
            try:
                conn.close()
                print(f"[CLEANUP] Conexión DB cerrada")
            except Exception as e:
                print(f"[CLEANUP] Error cerrando conexión: {e}")

        # Eliminar archivos temporales
        for path in self._archivos_temp:
            try:
                os.remove(path)
                print(f"[CLEANUP] Archivo temporal eliminado: {path}")
            except FileNotFoundError:
                pass
            except Exception as e:
                print(f"[CLEANUP] Error eliminando {path}: {e}")

        # Cancelar tareas de fondo
        for tarea in self._tareas_fondo:
            if not tarea.done():
                tarea.cancel()

        print(f"[CLEANUP] Sesión {event.session_id[:8]} terminada")
        return {}

Notification hook — Redirigir mensajes del agente

El agente puede enviar notificaciones informativas mientras trabaja (progreso, advertencias, etc.). El Notification hook intercepta estos mensajes.

Configuración y uso

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, NotificationHook

async def redirigir_a_slack(event) -> dict:
    """Redirige las notificaciones del agente a un canal de Slack."""
    mensaje = event.message if hasattr(event, 'message') else str(event)

    import httpx
    import os

    webhook = os.environ.get("SLACK_WEBHOOK_URL")
    if webhook:
        try:
            async with httpx.AsyncClient(timeout=3.0) as client:
                await client.post(webhook, json={
                    "text": f"🤖 Agente: {mensaje}"
                })
        except Exception:
            pass

    return {}

async def notificacion_a_websocket(event) -> dict:
    """Envía notificaciones vía WebSocket para actualización en tiempo real."""
    import websockets
    import json

    mensaje_ws = json.dumps({
        "tipo": "notificacion_agente",
        "mensaje": str(event),
        "timestamp": asyncio.get_event_loop().time()
    })

    try:
        async with websockets.connect("ws://localhost:8765") as ws:
            await ws.send(mensaje_ws)
    except Exception:
        pass

    return {}


async def main():
    options = ClaudeCodeOptions(
        notification_hook=NotificationHook(redirigir_a_slack)
    )

    async for event in query(
        prompt="Implementa el módulo de autenticación completo",
        options=options
    ):
        pass

Context en hooks — Estado compartido entre hooks

El contexto del hook contiene información sobre la sesión actual que puedes usar para decisiones más inteligentes.

Qué contiene el contexto

@dataclass
class HookContext:
    session_id: str           # ID único de esta sesión del agente
    # Los hooks pueden también acceder al historial via closures

Compartir estado entre hooks con clase

import asyncio
from pathlib import Path
from datetime import datetime
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook

class ContextoHooks:
    """Estado compartido entre múltiples hooks de la misma sesión."""

    def __init__(self):
        self.archivos_tocados: set = set()
        self.herramientas_usadas: list = []
        self.inicio_sesion = datetime.now()
        self.ultimo_error: str = ""

    async def pre_hook(self, event) -> dict:
        """Antes: registrar intención."""
        self.herramientas_usadas.append({
            "tool": event.tool_name,
            "timestamp": datetime.now().isoformat(),
            "input_preview": str(event.tool_input)[:100]
        })

        # Lógica de contexto: si ya tocamos demasiados archivos, advertir
        if len(self.archivos_tocados) > 50:
            print(f"[CONTEXTO] Advertencia: el agente ha tocado {len(self.archivos_tocados)} archivos")

        return {}

    async def post_hook(self, event) -> dict:
        """Después: registrar resultado."""
        # Registrar archivos tocados
        archivo = event.tool_input.get("file_path", "")
        if archivo:
            self.archivos_tocados.add(archivo)

        if event.tool_output.get("isError"):
            self.ultimo_error = f"{event.tool_name}: {str(event.tool_output)[:200]}"

        return {}

    def generar_resumen(self) -> dict:
        duracion = (datetime.now() - self.inicio_sesion).total_seconds()
        return {
            "duracion_segundos": duracion,
            "total_operaciones": len(self.herramientas_usadas),
            "archivos_unicos": len(self.archivos_tocados),
            "ultimo_error": self.ultimo_error
        }


# Uso
ctx = ContextoHooks()

options = ClaudeCodeOptions(
    hooks=[
        HookMatcher(
            matcher=".*",
            hooks=[
                PreToolUseHook(ctx.pre_hook),
                PostToolUseHook(ctx.post_hook)
            ]
        )
    ]
)

Hooks asíncronos

Todos los hooks deben ser funciones async. El SDK los espera (await) antes de continuar. Esto habilita operaciones IO sin bloquear.

Operaciones IO en hooks

import asyncio
import aiofiles
import httpx
from datetime import datetime

async def hook_log_a_archivo_async(event) -> dict:
    """Log asíncrono: no bloquea el hilo principal."""
    entrada = f"{datetime.now().isoformat()} | {event.tool_name} | {event.tool_use_id}\n"

    async with aiofiles.open("/tmp/agente_audit.log", "a") as f:
        await f.write(entrada)

    return {}


async def hook_verificar_api_externa(event) -> dict:
    """Verifica en API externa si la acción está permitida."""
    tool_name = event.tool_name
    tool_input = event.tool_input

    try:
        async with httpx.AsyncClient(timeout=2.0) as client:
            response = await client.post(
                "https://api.politicas.empresa.com/verificar",
                json={
                    "accion": tool_name,
                    "input": str(tool_input)[:500],
                    "usuario": "agente-automatico"
                }
            )
            datos = response.json()

            if not datos.get("permitido", True):
                return {
                    "hookSpecificOutput": {
                        "permissionDecision": "deny",
                        "permissionDecisionReason": datos.get(
                            "razon", "Acción no permitida por política empresarial"
                        )
                    }
                }
    except (httpx.TimeoutException, httpx.ConnectError):
        # Si la API de políticas no está disponible, permitir (fail-open)
        # O cambiar a fail-closed según tu política de seguridad
        print(f"[ADVERTENCIA] API de políticas no disponible para {tool_name}")

    return {}

Timeout de hooks

import asyncio

async def hook_con_timeout(event) -> dict:
    """Hook con timeout propio para evitar bloquear al agente."""
    try:
        resultado = await asyncio.wait_for(
            mi_logica_async(event),
            timeout=5.0  # máximo 5 segundos
        )
        return resultado
    except asyncio.TimeoutError:
        print(f"[WARNING] Hook timeout para {event.tool_name}, permitiendo continuar")
        return {}  # Fail-open: permitir si el hook tarda demasiado


async def mi_logica_async(event) -> dict:
    # Lógica que podría tardar...
    await asyncio.sleep(0)  # Ceder control
    return {}

Composición de hooks — Pipeline de validaciones

Múltiples hooks encadenados

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook

# Cada validador retorna (permitido: bool, razon: str)
class ValidadorPipeline:
    def __init__(self, *validadores):
        self.validadores = validadores

    async def __call__(self, event) -> dict:
        """Ejecuta todos los validadores en secuencia. El primero que falla, detiene el pipeline."""
        for validador in self.validadores:
            resultado = await validador(event)
            # Si algún validador retorna deny, detener aquí
            if resultado.get("hookSpecificOutput", {}).get("permissionDecision") == "deny":
                print(f"[PIPELINE] Bloqueado por: {validador.__name__}")
                return resultado
        return {}


# Validadores individuales (cada uno hace UNA cosa)
async def validar_no_env(event) -> dict:
    archivo = event.tool_input.get("file_path", "")
    if ".env" in archivo:
        return {"hookSpecificOutput": {"permissionDecision": "deny", "permissionDecisionReason": "No .env"}}
    return {}

async def validar_no_produccion(event) -> dict:
    archivo = event.tool_input.get("file_path", "")
    if "/produccion/" in archivo or "/prod/" in archivo:
        return {"hookSpecificOutput": {"permissionDecision": "deny", "permissionDecisionReason": "No producción"}}
    return {}

async def validar_tamaño_archivo(event) -> dict:
    contenido = event.tool_input.get("content", "")
    if len(contenido) > 100_000:  # 100KB
        return {"hookSpecificOutput": {"permissionDecision": "deny", "permissionDecisionReason": "Archivo muy grande"}}
    return {}

async def validar_encoding_utf8(event) -> dict:
    contenido = event.tool_input.get("content", "")
    try:
        contenido.encode("utf-8")
    except UnicodeEncodeError:
        return {"hookSpecificOutput": {"permissionDecision": "deny", "permissionDecisionReason": "Encoding inválido"}}
    return {}


# Combinar todos en un pipeline
pipeline_escritura = ValidadorPipeline(
    validar_no_env,
    validar_no_produccion,
    validar_tamaño_archivo,
    validar_encoding_utf8
)

options = ClaudeCodeOptions(
    hooks=[
        HookMatcher(
            matcher="^(Write|Edit|MultiEdit)$",
            hooks=[PreToolUseHook(pipeline_escritura)]
        )
    ]
)

Hook manager pattern

from typing import List, Callable, Dict, Any
import asyncio

class HookManager:
    """Gestiona colecciones de hooks con prioridades y condiciones."""

    def __init__(self):
        self._pre_hooks: List[tuple[int, Callable]] = []
        self._post_hooks: List[tuple[int, Callable]] = []

    def agregar_pre_hook(self, hook: Callable, prioridad: int = 0):
        """Agrega un pre-hook. Mayor prioridad = se ejecuta primero."""
        self._pre_hooks.append((prioridad, hook))
        self._pre_hooks.sort(key=lambda x: -x[0])

    def agregar_post_hook(self, hook: Callable, prioridad: int = 0):
        self._post_hooks.append((prioridad, hook))
        self._post_hooks.sort(key=lambda x: -x[0])

    async def ejecutar_pre(self, event) -> Dict[str, Any]:
        for _, hook in self._pre_hooks:
            resultado = await hook(event)
            if resultado.get("hookSpecificOutput", {}).get("permissionDecision") == "deny":
                return resultado
        return {}

    async def ejecutar_post(self, event) -> Dict[str, Any]:
        for _, hook in self._post_hooks:
            resultado = await hook(event)
            if resultado:
                return resultado
        return {}


# Uso del HookManager
manager = HookManager()
manager.agregar_pre_hook(validar_no_env, prioridad=100)        # Alta prioridad
manager.agregar_pre_hook(validar_no_produccion, prioridad=90)
manager.agregar_pre_hook(validar_tamaño_archivo, prioridad=50)
manager.agregar_pre_hook(hook_log_todo, prioridad=0)            # Baja prioridad

Sistema de permisos con hooks

Implementación completa de permisos empresariales

import asyncio
import json
from enum import Enum
from dataclasses import dataclass, field
from typing import Set, Dict, Optional, List
from pathlib import Path

class Permiso(Enum):
    LEER = "leer"
    ESCRIBIR = "escribir"
    EJECUTAR = "ejecutar"
    BORRAR = "borrar"
    ADMIN = "admin"


@dataclass
class Politica:
    """Define qué puede y no puede hacer el agente."""
    herramientas_permitidas: Set[str] = field(default_factory=set)
    herramientas_prohibidas: Set[str] = field(default_factory=set)
    paths_escritura_permitidos: List[str] = field(default_factory=list)
    paths_lectura_permitidos: List[str] = field(default_factory=list)
    comandos_bash_permitidos: List[str] = field(default_factory=list)
    max_archivos_por_sesion: int = 100
    max_llamadas_bash: int = 50

    @classmethod
    def para_desarrollador(cls) -> "Politica":
        """Política permisiva para entorno de desarrollo."""
        return cls(
            herramientas_permitidas={".*"},  # Todas
            herramientas_prohibidas={"Computer"},  # Except control de GUI
            paths_escritura_permitidos=["/home", "/tmp", "/workspace"],
            paths_lectura_permitidos=[".*"],
            max_archivos_por_sesion=500
        )

    @classmethod
    def para_produccion(cls) -> "Politica":
        """Política restrictiva para entorno de producción."""
        return cls(
            herramientas_permitidas={"Read", "Bash"},  # Solo lectura + bash limitado
            herramientas_prohibidas={"Write", "Edit", "MultiEdit", "Computer"},
            paths_escritura_permitidos=["/tmp"],
            paths_lectura_permitidos=["/app", "/var/log"],
            comandos_bash_permitidos=["ls", "grep", "cat", "tail", "ps", "df", "free"],
            max_archivos_por_sesion=10,
            max_llamadas_bash=20
        )


class SistemaPermisos:
    def __init__(self, politica: Politica):
        self.politica = politica
        self._archivos_escritos: int = 0
        self._llamadas_bash: int = 0

    async def verificar(self, event) -> dict:
        """Verifica si la acción está permitida por la política."""
        tool_name = event.tool_name
        tool_input = event.tool_input

        # 1. Verificar herramientas prohibidas
        import re
        for prohibida in self.politica.herramientas_prohibidas:
            if re.match(prohibida, tool_name):
                return self._denegar(f"Herramienta '{tool_name}' está prohibida por política")

        # 2. Verificar límite de archivos escritos
        if tool_name in ("Write", "Edit", "MultiEdit"):
            if self._archivos_escritos >= self.politica.max_archivos_por_sesion:
                return self._denegar(
                    f"Límite de {self.politica.max_archivos_por_sesion} "
                    f"archivos por sesión alcanzado"
                )

            # Verificar paths permitidos para escritura
            archivo = tool_input.get("file_path", "")
            if archivo and not self._path_en_lista(archivo, self.politica.paths_escritura_permitidos):
                return self._denegar(
                    f"Path '{archivo}' no está en la lista de escritura permitida"
                )
            self._archivos_escritos += 1

        # 3. Verificar límite de llamadas bash
        if tool_name == "Bash":
            if self._llamadas_bash >= self.politica.max_llamadas_bash:
                return self._denegar(
                    f"Límite de {self.politica.max_llamadas_bash} "
                    f"comandos Bash por sesión alcanzado"
                )

            # Verificar comandos permitidos
            comando = tool_input.get("command", "").split()[0] if tool_input.get("command") else ""
            if self.politica.comandos_bash_permitidos and comando:
                if not any(comando == c for c in self.politica.comandos_bash_permitidos):
                    return self._denegar(
                        f"Comando '{comando}' no está en la lista de comandos permitidos"
                    )
            self._llamadas_bash += 1

        return {}

    def _path_en_lista(self, path: str, lista: List[str]) -> bool:
        if not lista:
            return True  # Si no hay restricciones, todo permitido
        for patron in lista:
            if path.startswith(patron) or patron == ".*":
                return True
        return False

    def _denegar(self, razon: str) -> dict:
        print(f"[PERMISOS] DENEGADO: {razon}")
        return {
            "hookSpecificOutput": {
                "permissionDecision": "deny",
                "permissionDecisionReason": razon
            }
        }


# Uso
async def main():
    import os
    entorno = os.environ.get("ENTORNO", "desarrollo")
    politica = Politica.para_produccion() if entorno == "produccion" else Politica.para_desarrollador()
    permisos = SistemaPermisos(politica)

    options = ClaudeCodeOptions(
        hooks=[
            HookMatcher(
                matcher=".*",
                hooks=[PreToolUseHook(permisos.verificar)]
            )
        ]
    )

    async for event in query(
        prompt="Analiza los logs y genera un reporte",
        options=options
    ):
        pass

Hooks para testing

Mock de herramientas en tests

import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook

class MockHerramienta:
    """Intercepta llamadas a herramientas reales y retorna datos mock."""

    def __init__(self, tool_name: str, output_mock: dict):
        self.tool_name = tool_name
        self.output_mock = output_mock
        self.llamadas: list = []

    async def interceptar(self, event) -> dict:
        if event.tool_name == self.tool_name:
            self.llamadas.append({
                "input": event.tool_input,
                "tool_use_id": event.tool_use_id
            })
            # Override el output con el mock
            return {
                "hookSpecificOutput": {
                    "overrideToolUseOutput": self.output_mock
                }
            }
        return {}


# Test que verifica que el agente llama a las herramientas correctas
@pytest.mark.asyncio
async def test_agente_llama_bash():
    mock_bash = MockHerramienta(
        tool_name="Bash",
        output_mock={
            "type": "tool_result",
            "content": "Tests passed: 42 passed, 0 failed"
        }
    )

    options = ClaudeCodeOptions(
        hooks=[
            HookMatcher(
                matcher="^Bash$",
                hooks=[PostToolUseHook(mock_bash.interceptar)]
            )
        ],
        max_turns=5
    )

    async for event in query(
        prompt="Ejecuta los tests del proyecto con pytest",
        options=options
    ):
        pass

    # Verificaciones
    assert len(mock_bash.llamadas) >= 1, "El agente debe haber llamado a Bash"
    comandos = [c["input"].get("command", "") for c in mock_bash.llamadas]
    assert any("pytest" in cmd for cmd in comandos), "Debe haber llamado a pytest"


# Test de dry-run: verificar intenciones sin ejecutar nada
@pytest.mark.asyncio
async def test_dry_run_agente():
    """Verifica qué haría el agente sin ejecutar nada realmente."""
    acciones_planeadas = []

    async def capturar_sin_ejecutar(event) -> dict:
        acciones_planeadas.append({
            "tool": event.tool_name,
            "input": event.tool_input
        })
        # Retornar mock en lugar de ejecutar
        return {
            "hookSpecificOutput": {
                "overrideToolUseOutput": {
                    "type": "tool_result",
                    "content": "OK (simulado)"
                }
            }
        }

    options = ClaudeCodeOptions(
        hooks=[
            HookMatcher(
                matcher=".*",
                hooks=[PostToolUseHook(capturar_sin_ejecutar)]
            )
        ],
        max_turns=10
    )

    async for event in query(
        prompt="Refactoriza el módulo de autenticación",
        options=options
    ):
        pass

    print(f"El agente habría ejecutado {len(acciones_planeadas)} acciones:")
    for accion in acciones_planeadas:
        print(f"  - {accion['tool']}: {str(accion['input'])[:60]}")

    return acciones_planeadas

TypeScript equivalente para testing:

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

interface AccionCapturada {
  tool: string;
  input: Record<string, unknown>;
}

async function dryRunAgente(prompt: string): Promise<AccionCapturada[]> {
  const acciones: AccionCapturada[] = [];

  const capturar = async (event: any) => {
    acciones.push({ tool: event.toolName, input: event.toolInput });
    return {
      hookSpecificOutput: {
        overrideToolUseOutput: { type: "tool_result", content: "OK (simulado)" }
      }
    };
  };

  const options: ClaudeCodeOptions = {
    hooks: [{
      matcher: ".*",
      hooks: [{ type: "PostToolUse", handler: capturar }]
    }],
    maxTurns: 10
  };

  for await (const _ of query({ prompt, options })) { /* consumir */ }

  return acciones;
}

// En test:
const acciones = await dryRunAgente("Refactoriza el módulo de auth");
console.assert(acciones.length > 0, "Debe planear al menos una acción");

Hooks de observabilidad

Métricas de uso de herramientas con Prometheus

import asyncio
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook

@dataclass
class MetricasHerramienta:
    total_llamadas: int = 0
    total_errores: int = 0
    latencias_ms: List[float] = field(default_factory=list)

    @property
    def latencia_promedio_ms(self) -> float:
        if not self.latencias_ms:
            return 0.0
        return sum(self.latencias_ms) / len(self.latencias_ms)

    @property
    def latencia_p99_ms(self) -> float:
        if not self.latencias_ms:
            return 0.0
        sorted_lats = sorted(self.latencias_ms)
        idx = int(len(sorted_lats) * 0.99)
        return sorted_lats[min(idx, len(sorted_lats) - 1)]


class ColectorMetricas:
    def __init__(self):
        self._metricas: Dict[str, MetricasHerramienta] = defaultdict(MetricasHerramienta)
        self._tiempos_inicio: Dict[str, float] = {}

    async def pre_hook_tiempo(self, event) -> dict:
        """Registra el tiempo de inicio para calcular latencia."""
        self._tiempos_inicio[event.tool_use_id] = time.time()
        return {}

    async def post_hook_metricas(self, event) -> dict:
        """Calcula latencia y registra métricas después de cada tool call."""
        tool_name = event.tool_name
        metrica = self._metricas[tool_name]
        metrica.total_llamadas += 1

        if event.tool_output.get("isError"):
            metrica.total_errores += 1

        # Calcular latencia
        inicio = self._tiempos_inicio.pop(event.tool_use_id, None)
        if inicio:
            latencia_ms = (time.time() - inicio) * 1000
            metrica.latencias_ms.append(latencia_ms)

        return {}

    def generar_reporte(self) -> str:
        if not self._metricas:
            return "Sin métricas registradas"

        lineas = ["=== MÉTRICAS DE HERRAMIENTAS ==="]
        for tool, m in sorted(self._metricas.items()):
            tasa_error = (m.total_errores / m.total_llamadas * 100) if m.total_llamadas > 0 else 0
            lineas.append(
                f"{tool:40s} | llamadas: {m.total_llamadas:4d} | "
                f"errores: {m.total_errores:3d} ({tasa_error:5.1f}%) | "
                f"lat. avg: {m.latencia_promedio_ms:6.1f}ms | "
                f"lat. p99: {m.latencia_p99_ms:6.1f}ms"
            )
        return "\n".join(lineas)

    def exportar_prometheus(self) -> str:
        """Genera métricas en formato Prometheus text."""
        lineas = []
        for tool, m in self._metricas.items():
            tool_label = tool.replace(".", "_").replace("-", "_")
            lineas.append(f'agent_tool_calls_total{{tool="{tool}"}} {m.total_llamadas}')
            lineas.append(f'agent_tool_errors_total{{tool="{tool}"}} {m.total_errores}')
            if m.latencia_promedio_ms > 0:
                lineas.append(f'agent_tool_latency_ms{{tool="{tool}",quantile="0.5"}} {m.latencia_promedio_ms:.2f}')
                lineas.append(f'agent_tool_latency_ms{{tool="{tool}",quantile="0.99"}} {m.latencia_p99_ms:.2f}')
        return "\n".join(lineas)


# Uso con servidor HTTP para exponer métricas
async def main():
    colector = ColectorMetricas()

    options = ClaudeCodeOptions(
        hooks=[
            HookMatcher(
                matcher=".*",
                hooks=[
                    PreToolUseHook(colector.pre_hook_tiempo),
                    PostToolUseHook(colector.post_hook_metricas)
                ]
            )
        ],
        max_turns=20
    )

    async for event in query(
        prompt="Implementa un módulo Python completo con tests",
        options=options
    ):
        pass

    print(colector.generar_reporte())
    print("\n=== FORMATO PROMETHEUS ===")
    print(colector.exportar_prometheus())

asyncio.run(main())

Integración con OpenTelemetry

import asyncio
import time
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook

# Configurar OpenTelemetry
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://jaeger:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agente-claude")


class HooksOpenTelemetry:
    def __init__(self):
        self._spans: dict = {}

    async def pre_hook_span(self, event) -> dict:
        """Inicia un span de tracing por cada tool call."""
        span = tracer.start_span(
            name=f"tool.{event.tool_name}",
            attributes={
                "tool.name": event.tool_name,
                "tool.use_id": event.tool_use_id,
                "session.id": event.session_id,
                "tool.input.preview": str(event.tool_input)[:200]
            }
        )
        self._spans[event.tool_use_id] = span
        return {}

    async def post_hook_span(self, event) -> dict:
        """Finaliza el span con información del resultado."""
        span = self._spans.pop(event.tool_use_id, None)
        if span:
            span.set_attribute("tool.error", event.tool_output.get("isError", False))
            span.end()
        return {}


otel_hooks = HooksOpenTelemetry()

options = ClaudeCodeOptions(
    hooks=[
        HookMatcher(
            matcher=".*",
            hooks=[
                PreToolUseHook(otel_hooks.pre_hook_span),
                PostToolUseHook(otel_hooks.post_hook_span)
            ]
        )
    ]
)

Anti-patrones con hooks

Qué NO hacer

# ❌ ANTI-PATRÓN 1: Hook síncrono que bloquea
# El SDK espera funciones async. Un hook síncrono no funciona.
def hook_sincrono_mal(event):  # Sin async
    time.sleep(2)  # Bloquea el event loop
    return {}

# ✅ CORRECTO
async def hook_async_bien(event):
    await asyncio.sleep(0)  # Cede control sin bloquear
    return {}


# ❌ ANTI-PATRÓN 2: Modificar demasiado el comportamiento
async def hook_que_cambia_todo(event) -> dict:
    # Cambiar TODOS los comandos bash a algo diferente es confuso y difícil de debuggear
    if event.tool_name == "Bash":
        return {
            "hookSpecificOutput": {
                "overrideToolUseInput": {
                    "command": "echo 'interceptado'"  # Reemplaza TODO
                }
            }
        }
    return {}


# ❌ ANTI-PATRÓN 3: Hook que puede causar ciclo infinito
# Si el hook llama a query() internamente, puede crear recursión
async def hook_recursivo_mal(event) -> dict:
    if event.tool_name == "Bash":
        # ¡NUNCA hagas esto! Llama a query() dentro de un hook
        async for sub_event in query(prompt="verifica el comando", options=options):
            pass
    return {}


# ❌ ANTI-PATRÓN 4: Ignorar excepciones silenciosamente
async def hook_ignora_errores_mal(event) -> dict:
    try:
        resultado = await operacion_que_puede_fallar(event)
    except:
        pass  # Silenciar todos los errores es peligroso
    return {}

# ✅ CORRECTO: Log el error y continuar
async def hook_maneja_errores_bien(event) -> dict:
    try:
        resultado = await operacion_que_puede_fallar(event)
        return resultado
    except SpecificException as e:
        print(f"[HOOK ERROR] {event.tool_name}: {e}")
        return {}  # Fail-open: permitir la acción si el hook falla
    except Exception as e:
        print(f"[HOOK ERROR CRÍTICO] Tipo inesperado: {type(e).__name__}: {e}")
        raise  # Re-lanzar errores inesperados para no ocultarlos

Ejemplo completo: Sistema de auditoría

Auditoría completa con SQLite que registra TODAS las acciones

import asyncio
import sqlite3
import json
import time
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, List
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook, StopHook

DB_AUDITORIA = "/tmp/auditoria_agente.db"

def inicializar_db_auditoria():
    conn = sqlite3.connect(DB_AUDITORIA)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS acciones (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            session_id TEXT NOT NULL,
            tool_use_id TEXT NOT NULL,
            tool_name TEXT NOT NULL,
            tool_input TEXT,
            tool_output TEXT,
            duracion_ms REAL,
            fue_denegado BOOLEAN DEFAULT FALSE,
            razon_denegacion TEXT,
            timestamp_inicio TEXT NOT NULL,
            timestamp_fin TEXT
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS sesiones (
            session_id TEXT PRIMARY KEY,
            inicio TEXT NOT NULL,
            fin TEXT,
            total_acciones INTEGER DEFAULT 0,
            total_denegaciones INTEGER DEFAULT 0,
            resultado_final TEXT
        )
    """)
    conn.commit()
    return conn


class SistemaAuditoria:
    """Sistema de auditoría completo para el agente."""

    def __init__(self):
        self._conn = inicializar_db_auditoria()
        self._tiempos: dict = {}
        self._sesion_iniciada = False
        self._session_id: Optional[str] = None

    def _registrar_sesion(self, session_id: str):
        if not self._sesion_iniciada:
            self._session_id = session_id
            self._sesion_iniciada = True
            self._conn.execute(
                "INSERT OR IGNORE INTO sesiones (session_id, inicio) VALUES (?, ?)",
                (session_id, datetime.now().isoformat())
            )
            self._conn.commit()

    async def pre_hook(self, event) -> dict:
        """Registra el inicio de cada acción."""
        self._registrar_sesion(event.session_id)
        self._tiempos[event.tool_use_id] = time.time()

        self._conn.execute("""
            INSERT INTO acciones
            (session_id, tool_use_id, tool_name, tool_input, timestamp_inicio)
            VALUES (?, ?, ?, ?, ?)
        """, (
            event.session_id,
            event.tool_use_id,
            event.tool_name,
            json.dumps(event.tool_input, ensure_ascii=False, default=str)[:2000],
            datetime.now().isoformat()
        ))
        self._conn.commit()
        return {}

    async def post_hook(self, event) -> dict:
        """Actualiza el registro con el resultado y duración."""
        inicio = self._tiempos.pop(event.tool_use_id, time.time())
        duracion_ms = (time.time() - inicio) * 1000

        self._conn.execute("""
            UPDATE acciones
            SET tool_output = ?,
                duracion_ms = ?,
                timestamp_fin = ?
            WHERE tool_use_id = ?
        """, (
            json.dumps(event.tool_output, ensure_ascii=False, default=str)[:2000],
            duracion_ms,
            datetime.now().isoformat(),
            event.tool_use_id
        ))
        self._conn.commit()
        return {}

    async def hook_denegacion(self, resultado_original: dict, event) -> dict:
        """Registra cuando se deniega una acción (para usar como PreToolUse + post-process)."""
        permision = resultado_original.get("hookSpecificOutput", {}).get("permissionDecision")
        if permision == "deny":
            razon = resultado_original.get("hookSpecificOutput", {}).get("permissionDecisionReason", "")
            self._conn.execute("""
                UPDATE acciones
                SET fue_denegado = TRUE, razon_denegacion = ?
                WHERE tool_use_id = ?
            """, (razon[:500], event.tool_use_id))
            self._conn.execute("""
                UPDATE sesiones
                SET total_denegaciones = total_denegaciones + 1
                WHERE session_id = ?
            """, (event.session_id,))
            self._conn.commit()
        return resultado_original

    async def stop_hook(self, event) -> dict:
        """Genera reporte final y actualiza la sesión."""
        session_id = event.session_id if hasattr(event, 'session_id') else self._session_id

        if not session_id:
            return {}

        # Contar acciones de la sesión
        cursor = self._conn.execute(
            "SELECT COUNT(*) as total FROM acciones WHERE session_id = ?",
            (session_id,)
        )
        total = cursor.fetchone()[0]

        self._conn.execute("""
            UPDATE sesiones
            SET fin = ?, total_acciones = ?
            WHERE session_id = ?
        """, (datetime.now().isoformat(), total, session_id))
        self._conn.commit()

        # Generar reporte
        reporte = self._generar_reporte(session_id)
        print(reporte)

        # Guardar reporte en archivo
        ruta_reporte = f"/tmp/reporte_sesion_{session_id[:8]}.txt"
        with open(ruta_reporte, "w") as f:
            f.write(reporte)

        print(f"\nReporte guardado en: {ruta_reporte}")
        return {}

    def _generar_reporte(self, session_id: str) -> str:
        cursor = self._conn.execute("""
            SELECT
                tool_name,
                COUNT(*) as total,
                SUM(fue_denegado) as denegados,
                AVG(duracion_ms) as lat_avg,
                MAX(duracion_ms) as lat_max
            FROM acciones
            WHERE session_id = ?
            GROUP BY tool_name
            ORDER BY total DESC
        """, (session_id,))

        lineas = [
            f"\n{'='*70}",
            f"REPORTE DE AUDITORÍA",
            f"Sesión: {session_id[:16]}",
            f"Generado: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
            f"{'='*70}",
            f"{'Herramienta':<35} {'Llamadas':>8} {'Denegadas':>9} {'Lat.Avg':>8} {'Lat.Max':>8}",
            f"{'-'*70}"
        ]

        for row in cursor.fetchall():
            tool, total, denegados, lat_avg, lat_max = row
            lineas.append(
                f"{tool:<35} {total:>8} {(denegados or 0):>9} "
                f"{(lat_avg or 0):>7.1f}ms {(lat_max or 0):>7.1f}ms"
            )

        lineas.append(f"{'='*70}")
        return "\n".join(lineas)

    def cerrar(self):
        self._conn.close()


# TypeScript equivalente para auditoria simple
EJEMPLO_TYPESCRIPT = """
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import Database from "better-sqlite3";

const db = new Database("/tmp/auditoria.db");
db.exec(`
  CREATE TABLE IF NOT EXISTS acciones (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    session_id TEXT,
    tool_name TEXT,
    tool_input TEXT,
    duracion_ms REAL,
    timestamp TEXT
  )
`);

const tiempos = new Map<string, number>();

const preHook = async (event: any) => {
  tiempos.set(event.toolUseId, Date.now());
  db.prepare(
    "INSERT INTO acciones (session_id, tool_name, tool_input, timestamp) VALUES (?, ?, ?, ?)"
  ).run(event.sessionId, event.toolName, JSON.stringify(event.toolInput).slice(0, 500), new Date().toISOString());
  return {};
};

const postHook = async (event: any) => {
  const inicio = tiempos.get(event.toolUseId) ?? Date.now();
  const duracion = Date.now() - inicio;
  db.prepare(
    "UPDATE acciones SET duracion_ms = ? WHERE tool_name = ? ORDER BY id DESC LIMIT 1"
  ).run(duracion, event.toolName);
  tiempos.delete(event.toolUseId);
  return {};
};
"""


# Función principal que usa el sistema de auditoría
async def main():
    auditoria = SistemaAuditoria()

    try:
        options = ClaudeCodeOptions(
            hooks=[
                HookMatcher(
                    matcher=".*",
                    hooks=[
                        PreToolUseHook(auditoria.pre_hook),
                        PostToolUseHook(auditoria.post_hook)
                    ]
                )
            ],
            max_turns=15,
            system_prompt="Eres un agente de desarrollo. Implementa lo que se te pide de forma cuidadosa."
        )

        print("Iniciando agente con auditoría completa...")
        async for event in query(
            prompt="Crea un script Python que calcule los primeros 20 números de Fibonacci con memoización",
            options=options
        ):
            if hasattr(event, 'type') and event.type == 'assistant':
                for block in event.message.content:
                    if hasattr(block, 'text'):
                        print(block.text, end="", flush=True)
        print()

    finally:
        auditoria.cerrar()

asyncio.run(main())

Resumen del capítulo

mindmap
  root((Hooks))
    Tipos de Hook
      PreToolUse
        Validar input
        Denegar acción
        Transformar input
      PostToolUse
        Log resultado
        Modificar output
        Notificar
      Stop
        Reporte final
        Cleanup recursos
      Notification
        Redirigir mensajes
    HookMatcher
      Regex patterns
      Múltiples matchers
      allowed_tools
    Casos de Uso
      Seguridad
        Proteger archivos
        Bloquear comandos
        Sistema de permisos
      Observabilidad
        Métricas
        OpenTelemetry
        Prometheus
      Testing
        Mocks
        Dry-run
        Verificación
      Auditoría
        SQLite log
        Reporte final
    Composición
      Pipeline de validaciones
      HookManager
      Prioridades
    Anti-patrones
      Hooks síncronos
      Recursión
      Silenciar errores
      Demasiada latencia