Capítulo 6: Sistema de Hooks — Control Total del Agente
Capítulo 6: Sistema de Hooks — Control Total del Agente
¿Qué son los hooks?
Los hooks son callbacks que el SDK llama en puntos específicos del ciclo de vida agentic. Son el mecanismo que te da control total sobre lo que hace el agente: puedes observar cada acción, modificar entradas y salidas, bloquear operaciones peligrosas, o disparar efectos secundarios como notificaciones y métricas.
El ciclo de vida del agente con hooks
flowchart TD
START(["Inicio: query()"])
PROMPT["LLM procesa el prompt"]
RESP["LLM genera respuesta"]
TOOL_CALL{"¿La respuesta\nincluye tool call?"}
PRE["PreToolUse Hook\n(ANTES de ejecutar)"]
DECISION{"¿El hook\npermite ejecutar?"}
EXEC["Ejecutar herramienta"]
POST["PostToolUse Hook\n(DESPUÉS de ejecutar)"]
MODIFY{"¿El hook\nmodifica output?"}
SEND_RESULT["Enviar resultado al LLM"]
NOTIF["Notification Hook\n(mensajes al usuario)"]
MAX_TURNS{"¿Máximo de\nturns alcanzado?"}
STOP["Stop Hook\n(agente terminó)"]
END(["Fin: ResultMessage"])
START --> PROMPT
PROMPT --> RESP
RESP --> TOOL_CALL
TOOL_CALL -->|"Sí"| PRE
TOOL_CALL -->|"No (mensaje final)"| NOTIF
PRE --> DECISION
DECISION -->|"deny"| SEND_RESULT
DECISION -->|"allow"| EXEC
EXEC --> POST
POST --> MODIFY
MODIFY -->|"override output"| SEND_RESULT
MODIFY -->|"original output"| SEND_RESULT
SEND_RESULT --> PROMPT
NOTIF --> MAX_TURNS
MAX_TURNS -->|"Sí"| STOP
MAX_TURNS -->|"No"| PROMPT
STOP --> END
style PRE fill:#FF9800,color:#000
style POST fill:#4CAF50,color:#fff
style STOP fill:#F44336,color:#fff
style NOTIF fill:#2196F3,color:#fff
Lista completa de eventos de hook
| Evento | Cuándo se ejecuta | Puede modificar |
|---|---|---|
PreToolUse | Antes de ejecutar una herramienta | Input de la herramienta, puede denegar |
PostToolUse | Después de ejecutar una herramienta | Output de la herramienta |
Stop | Cuando el agente termina (éxito o max_turns) | No (solo observación) |
Notification | Cuando el agente envía mensajes informativos | Puede redirigir notificaciones |
Qué retorna un hook
Cada hook retorna un diccionario que le indica al SDK cómo proceder:
# 1. Continuar sin cambios
return {}
# 2. Denegar la ejecución de la herramienta (PreToolUse)
return {
"hookSpecificOutput": {
"permissionDecision": "deny",
"permissionDecisionReason": "No se permiten modificaciones a archivos .env"
}
}
# 3. Modificar el input antes de ejecutar (PreToolUse)
return {
"hookSpecificOutput": {
"overrideToolUseInput": {
"file_path": "/ruta/absoluta/corregida.py",
"content": "# Contenido modificado por el hook"
}
}
}
# 4. Modificar el output después de ejecutar (PostToolUse)
return {
"hookSpecificOutput": {
"overrideToolUseOutput": {
"type": "tool_result",
"content": "Output modificado por el hook PostToolUse"
}
}
}
HookMatcher — Configuración
Un HookMatcher combina un patrón regex con una lista de hooks. El SDK evalúa el nombre de la herramienta contra el matcher y ejecuta los hooks correspondientes.
Signature completa
Python:
from claude_code_sdk import HookMatcher, PreToolUseHook, PostToolUseHook
HookMatcher(
matcher="^(Write|Edit|MultiEdit)$", # Regex para tool_name
hooks=[ # Lista de callbacks
PreToolUseHook(mi_hook_pre),
PostToolUseHook(mi_hook_post)
]
)
TypeScript:
import { HookMatcher, PreToolUseHook } from "@anthropic-ai/claude-code-sdk";
const matcher: HookMatcher = {
matcher: "^(Write|Edit|MultiEdit)$",
hooks: [
{ type: "PreToolUse", handler: miHookPre },
{ type: "PostToolUse", handler: miHookPost }
]
};
Patrones matcher comunes
from claude_code_sdk import HookMatcher, PreToolUseHook
# Todos los tools
HookMatcher(matcher=".*", hooks=[...])
# Solo herramientas de escritura de archivos
HookMatcher(matcher="^(Write|Edit|MultiEdit)$", hooks=[...])
# Solo herramientas de bash y comandos
HookMatcher(matcher="^(Bash|Computer)$", hooks=[...])
# Todas las herramientas MCP
HookMatcher(matcher="^mcp__.*$", hooks=[...])
# Un servidor MCP específico
HookMatcher(matcher="^mcp__postgres__.*$", hooks=[...])
# Una herramienta específica
HookMatcher(matcher="^Bash$", hooks=[...])
# Cualquier herramienta de lectura
HookMatcher(matcher="^(Read|Glob|Grep|LS)$", hooks=[...])
Múltiples matchers en un agente
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook
async def hook_escritura(event):
"""Bloquea escritura en paths sensibles."""
tool_input = event.tool_input
if "file_path" in tool_input:
path = tool_input["file_path"]
if ".env" in path or "secrets" in path.lower():
return {
"hookSpecificOutput": {
"permissionDecision": "deny",
"permissionDecisionReason": f"Escritura en '{path}' no permitida"
}
}
return {}
async def hook_bash(event):
"""Bloquea comandos bash peligrosos."""
comando = event.tool_input.get("command", "")
if "rm -rf" in comando or "sudo" in comando:
return {
"hookSpecificOutput": {
"permissionDecision": "deny",
"permissionDecisionReason": f"Comando peligroso bloqueado: {comando[:50]}"
}
}
return {}
async def hook_log_todo(event):
"""Registra todas las herramientas ejecutadas."""
print(f"[LOG] Tool: {event.tool_name}")
return {}
async def main():
options = ClaudeCodeOptions(
hooks=[
# Hook 1: Proteger archivos sensibles (solo escrituras)
HookMatcher(
matcher="^(Write|Edit|MultiEdit)$",
hooks=[PreToolUseHook(hook_escritura)]
),
# Hook 2: Validar comandos bash
HookMatcher(
matcher="^Bash$",
hooks=[PreToolUseHook(hook_bash)]
),
# Hook 3: Log de todo (todos los tools)
HookMatcher(
matcher=".*",
hooks=[PreToolUseHook(hook_log_todo)]
)
]
)
async for event in query(
prompt="Crea un archivo test.py y ejecuta pytest",
options=options
):
pass
asyncio.run(main())
TypeScript equivalente:
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const options: ClaudeCodeOptions = {
hooks: [
{
matcher: "^(Write|Edit|MultiEdit)$",
hooks: [{
type: "PreToolUse",
handler: async (event) => {
const path = event.toolInput?.file_path as string;
if (path?.includes(".env")) {
return {
hookSpecificOutput: {
permissionDecision: "deny",
permissionDecisionReason: "Escritura en .env no permitida"
}
};
}
return {};
}
}]
},
{
matcher: "^Bash$",
hooks: [{
type: "PreToolUse",
handler: async (event) => {
const cmd = event.toolInput?.command as string;
if (cmd?.includes("rm -rf")) {
return {
hookSpecificOutput: {
permissionDecision: "deny",
permissionDecisionReason: "rm -rf bloqueado por política"
}
};
}
return {};
}
}]
}
]
};
PreToolUse — Interceptar antes de ejecutar
PreToolUse se ejecuta antes de que la herramienta corra. Es el punto donde puedes:
- Validar: rechazar inputs inválidos o peligrosos
- Transformar: modificar el input antes de ejecutar
- Denegar: bloquear completamente la ejecución
- Auditar: registrar qué va a hacer el agente
Estructura del evento PreToolUse
@dataclass
class PreToolUseEvent:
tool_name: str # Nombre de la herramienta: "Bash", "Write", "mcp__..."
tool_input: dict # Parámetros que usará la herramienta
tool_use_id: str # ID único de esta ejecución
session_id: str # ID de la sesión del agente
Ejemplo 1: Validar que no se editen archivos .env
import asyncio
import os
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook
ARCHIVOS_PROTEGIDOS = {
".env", ".env.local", ".env.production", ".env.development",
"secrets.json", "credentials.json", "private.key",
".npmrc", ".pypirc"
}
async def proteger_archivos_sensibles(event) -> dict:
"""Bloquea escritura en archivos con secretos o configuración sensible."""
tool_input = event.tool_input
archivo = tool_input.get("file_path", "")
if not archivo:
return {}
# Verificar nombre de archivo
nombre_archivo = Path(archivo).name
if nombre_archivo in ARCHIVOS_PROTEGIDOS:
return {
"hookSpecificOutput": {
"permissionDecision": "deny",
"permissionDecisionReason": (
f"BLOQUEADO: El archivo '{nombre_archivo}' está protegido. "
f"No se permite modificar archivos de credenciales o configuración sensible. "
f"Si necesitas cambiar variables de entorno, hazlo manualmente."
)
}
}
# Verificar extensiones peligrosas
extension = Path(archivo).suffix.lower()
if extension in {".key", ".pem", ".p12", ".pfx", ".cert"}:
return {
"hookSpecificOutput": {
"permissionDecision": "deny",
"permissionDecisionReason": (
f"BLOQUEADO: No se permite modificar archivos de certificados "
f"o claves privadas (extensión: {extension})"
)
}
}
# Verificar paths dentro de directorios de secretos
partes_path = Path(archivo).parts
directorios_prohibidos = {"secrets", "credentials", ".ssh", "certs", "private"}
for parte in partes_path[:-1]: # Excluir el nombre del archivo
if parte.lower() in directorios_prohibidos:
return {
"hookSpecificOutput": {
"permissionDecision": "deny",
"permissionDecisionReason": (
f"BLOQUEADO: El directorio '{parte}' está protegido"
)
}
}
return {}
async def main():
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher="^(Write|Edit|MultiEdit)$",
hooks=[PreToolUseHook(proteger_archivos_sensibles)]
)
]
)
async for event in query(
prompt="Actualiza el archivo .env con la nueva API key: KEY=abc123",
options=options
):
if hasattr(event, 'type') and event.type == 'result':
print(f"Resultado: {event.result}")
asyncio.run(main())
Ejemplo 2: Bloquear comandos bash peligrosos
import re
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook
# Patrones de comandos peligrosos con su razón
PATRONES_PELIGROSOS = [
(r'\brm\s+-[rRfF]*\s*/', "rm recursivo en directorio raíz"),
(r'\brm\s+-rf\b', "rm -rf (borrado recursivo forzado)"),
(r'\bdd\s+if=', "comando dd (puede sobrescribir discos)"),
(r'\b:(){ :|:& };:', "fork bomb"),
(r'\bchmod\s+777\s+/', "chmod 777 en directorio raíz"),
(r'\bsudo\s+rm', "sudo rm (borrado con privilegios elevados)"),
(r'\bformat\s+[cCdD]:', "format de disco (Windows)"),
(r'\bmkfs\b', "mkfs (formateo de sistema de archivos)"),
(r'\bshred\b', "shred (borrado seguro irreversible)"),
(r'\bwget\s+.*\s*\|\s*(?:bash|sh|zsh)', "descarga y ejecución de script remoto"),
(r'\bcurl\s+.*\s*\|\s*(?:bash|sh|zsh)', "descarga y ejecución de script remoto"),
]
# Comandos en whitelist (siempre permitidos sin importar los patrones anteriores)
COMANDOS_SEGUROS_WHITELIST = {
"ls", "cat", "echo", "pwd", "date", "whoami",
"git status", "git log", "git diff"
}
async def validar_bash_seguro(event) -> dict:
"""Valida comandos bash contra lista de patrones peligrosos."""
comando = event.tool_input.get("command", "").strip()
if not comando:
return {}
# Verificar whitelist (permitir incondicionalmente)
for cmd_seguro in COMANDOS_SEGUROS_WHITELIST:
if comando.startswith(cmd_seguro):
return {}
# Verificar patrones peligrosos
for patron, descripcion in PATRONES_PELIGROSOS:
if re.search(patron, comando, re.IGNORECASE):
return {
"hookSpecificOutput": {
"permissionDecision": "deny",
"permissionDecisionReason": (
f"Comando bloqueado por política de seguridad: {descripcion}.\n"
f"Comando: {comando[:100]}"
)
}
}
# Log de comandos que sí se ejecutan
print(f"[BASH PERMITIDO] {comando[:80]}")
return {}
TypeScript equivalente:
const patronesPeligrosos = [
{ patron: /\brm\s+-rf\b/, razon: "rm -rf bloqueado" },
{ patron: /\bsudo\s+rm/, razon: "sudo rm bloqueado" },
{ patron: /curl.*\|\s*(bash|sh)/, razon: "pipe a shell bloqueado" },
];
const hookBashSeguro = async (event: any) => {
const comando: string = event.toolInput?.command ?? "";
for (const { patron, razon } of patronesPeligrosos) {
if (patron.test(comando)) {
return {
hookSpecificOutput: {
permissionDecision: "deny",
permissionDecisionReason: `Bloqueado: ${razon}. Comando: ${comando.slice(0, 80)}`
}
};
}
}
return {};
};
Ejemplo 3: Transformar paths relativos a absolutos
import asyncio
import os
from pathlib import Path
async def normalizar_paths(event) -> dict:
"""Convierte paths relativos a absolutos antes de ejecutar herramientas de archivo."""
tool_input = event.tool_input
modificado = dict(tool_input)
cambio_realizado = False
directorio_trabajo = os.getcwd()
for campo in ["file_path", "path", "source_path", "destination_path"]:
if campo in tool_input and tool_input[campo]:
path_original = tool_input[campo]
path_obj = Path(path_original)
if not path_obj.is_absolute():
path_absoluto = str(Path(directorio_trabajo) / path_obj)
modificado[campo] = path_absoluto
cambio_realizado = True
print(f"[PATH NORMALIZADO] {path_original} → {path_absoluto}")
if cambio_realizado:
return {
"hookSpecificOutput": {
"overrideToolUseInput": modificado
}
}
return {}
Ejemplo 4: Rate limiting por herramienta
import asyncio
import time
from collections import defaultdict
from typing import Dict, List
class RateLimiter:
def __init__(self):
# {tool_name: [timestamps de los últimos usos]}
self._historial: Dict[str, List[float]] = defaultdict(list)
# Límites por herramienta: (max_llamadas, ventana_segundos)
LIMITES = {
"Bash": (10, 60), # Máximo 10 Bash por minuto
"Write": (20, 60), # Máximo 20 escrituras por minuto
"mcp__postgres__.*": (50, 60), # Máximo 50 queries DB por minuto
".*": (100, 60) # Global: 100 herramientas por minuto
}
async def __call__(self, event) -> dict:
tool_name = event.tool_name
ahora = time.time()
# Determinar límite aplicable
max_llamadas, ventana = 100, 60 # Default
for patron, (max_c, vent) in self.LIMITES.items():
import re
if re.match(patron, tool_name):
max_llamadas, ventana = max_c, vent
break
# Limpiar historial antiguo
self._historial[tool_name] = [
t for t in self._historial[tool_name]
if ahora - t < ventana
]
# Verificar límite
if len(self._historial[tool_name]) >= max_llamadas:
tiempo_espera = ventana - (ahora - self._historial[tool_name][0])
return {
"hookSpecificOutput": {
"permissionDecision": "deny",
"permissionDecisionReason": (
f"Rate limit alcanzado para '{tool_name}': "
f"{max_llamadas} llamadas/{ventana}s. "
f"Espera {tiempo_espera:.0f}s."
)
}
}
self._historial[tool_name].append(ahora)
return {}
# Uso
rate_limiter = RateLimiter()
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher=".*",
hooks=[PreToolUseHook(rate_limiter)]
)
]
)
PostToolUse — Interceptar después de ejecutar
PostToolUse se ejecuta después de que la herramienta termina, con acceso tanto al input como al output. Es el punto ideal para logging, verificaciones post-ejecución y transformación de resultados.
Estructura del evento PostToolUse
@dataclass
class PostToolUseEvent:
tool_name: str # Nombre de la herramienta
tool_input: dict # Los parámetros que se usaron
tool_output: dict # El resultado de la herramienta
tool_use_id: str # ID único de esta ejecución
session_id: str # ID de la sesión
Ejemplo 1: Log de archivos editados
import asyncio
import json
from datetime import datetime
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PostToolUseHook
class RegistroEdiciones:
def __init__(self, archivo_log: str = "/tmp/ediciones_agente.jsonl"):
self.archivo_log = archivo_log
self._ediciones: list = []
async def registrar_edicion(self, event) -> dict:
"""Registra en log cada archivo que el agente modifica."""
tool_input = event.tool_input
tool_output = event.tool_output
tool_name = event.tool_name
entrada_log = {
"timestamp": datetime.now().isoformat(),
"tool": tool_name,
"session_id": event.session_id,
"archivo": tool_input.get("file_path", "desconocido"),
"exito": not tool_output.get("isError", False),
"tool_use_id": event.tool_use_id
}
# Agregar detalles específicos por tipo de herramienta
if tool_name == "Write":
entrada_log["tipo"] = "crear/sobrescribir"
entrada_log["lineas"] = tool_input.get("content", "").count("\n")
elif tool_name in ("Edit", "MultiEdit"):
entrada_log["tipo"] = "editar"
self._ediciones.append(entrada_log)
# Escribir en archivo JSONL
with open(self.archivo_log, "a") as f:
f.write(json.dumps(entrada_log, ensure_ascii=False) + "\n")
print(f"[EDICIÓN REGISTRADA] {entrada_log['archivo']} ({tool_name})")
return {} # No modificar el output
def resumen(self) -> str:
if not self._ediciones:
return "No se editaron archivos"
archivos_unicos = {e["archivo"] for e in self._ediciones}
return f"Archivos editados: {len(archivos_unicos)} únicos, {len(self._ediciones)} operaciones totales"
Ejemplo 2: Verificar que los tests pasan después de Edit
import asyncio
import subprocess
from pathlib import Path
async def verificar_tests_post_edit(event) -> dict:
"""Ejecuta tests automáticamente después de editar archivos Python."""
tool_input = event.tool_input
archivo = tool_input.get("file_path", "")
# Solo para archivos Python que no sean tests
if not archivo.endswith(".py") or "test_" in Path(archivo).name:
return {}
# Intentar encontrar y correr el archivo de test correspondiente
ruta = Path(archivo)
directorio = ruta.parent
nombre_base = ruta.stem
archivo_test = directorio / f"test_{nombre_base}.py"
if not archivo_test.exists():
return {}
print(f"[AUTO-TEST] Ejecutando {archivo_test}...")
resultado = subprocess.run(
["python", "-m", "pytest", str(archivo_test), "-v", "--tb=short"],
capture_output=True,
text=True,
timeout=30
)
if resultado.returncode != 0:
# Tests fallaron: informar al LLM
mensaje_error = (
f"ADVERTENCIA: Los tests fallaron después de editar {archivo}.\n"
f"Salida de pytest:\n{resultado.stdout[-500:]}" # últimas 500 chars
)
print(f"[AUTO-TEST FALLÓ] {mensaje_error}")
return {
"hookSpecificOutput": {
"overrideToolUseOutput": {
"type": "tool_result",
"content": (
f"Archivo editado exitosamente, PERO los tests fallaron:\n"
f"{resultado.stdout[-500:]}"
)
}
}
}
print(f"[AUTO-TEST PASÓ] {archivo_test}")
return {}
Ejemplo 3: Notificar por Slack cuando se completa una tarea
import asyncio
import httpx
import os
from datetime import datetime
async def notificar_slack_post_bash(event) -> dict:
"""Envía notificación a Slack cuando se ejecuta un comando Bash exitosamente."""
tool_input = event.tool_input
tool_output = event.tool_output
comando = tool_input.get("command", "")
# Solo notificar comandos de deploy o CI
palabras_clave_deploy = ["deploy", "docker push", "kubectl apply", "terraform apply"]
es_deploy = any(palabra in comando.lower() for palabra in palabras_clave_deploy)
if not es_deploy:
return {}
exito = not tool_output.get("isError", False)
emoji = "✅" if exito else "❌"
estado = "exitoso" if exito else "FALLIDO"
mensaje = {
"text": f"{emoji} Deploy {estado}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{emoji} Deploy {estado}*\n`{comando[:100]}`"
}
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"Hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | Session: {event.session_id[:8]}"
}
]
}
]
}
slack_webhook = os.environ.get("SLACK_WEBHOOK_URL")
if slack_webhook:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(slack_webhook, json=mensaje)
except Exception as e:
print(f"[SLACK] Error enviando notificación: {e}")
return {}
async def main():
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher="^Bash$",
hooks=[PostToolUseHook(notificar_slack_post_bash)]
)
]
)
async for event in query(
prompt="Ejecuta el deployment con: ./deploy.sh --zero-downtime",
options=options
):
pass
TypeScript equivalente:
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const notificarSlack = async (event: any) => {
const comando: string = event.toolInput?.command ?? "";
const esDeploy = ["deploy", "docker push", "kubectl"].some(k => comando.includes(k));
if (!esDeploy) return {};
const webhook = process.env.SLACK_WEBHOOK_URL;
if (!webhook) return {};
try {
await fetch(webhook, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text: `Deploy ejecutado: \`${comando.slice(0, 80)}\`` })
});
} catch (e) {
console.error("Error Slack:", e);
}
return {};
};
const options: ClaudeCodeOptions = {
hooks: [{
matcher: "^Bash$",
hooks: [{ type: "PostToolUse", handler: notificarSlack }]
}]
};
Stop hook — Limpieza y reporte final
El Stop hook se ejecuta cuando el agente termina su trabajo, ya sea porque llegó a una respuesta final, alcanzó el máximo de turns, o fue interrumpido.
Casos de uso del Stop hook
import asyncio
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
from claude_code_sdk import query, ClaudeCodeOptions, StopHook
class ReporteFinal:
def __init__(self, email_destino: str):
self.email = email_destino
self._inicio = datetime.now()
self._acciones: list = []
def registrar_accion(self, tool_name: str):
self._acciones.append(tool_name)
async def enviar_reporte(self, event) -> dict:
"""Envía reporte por email cuando el agente termina."""
duracion = (datetime.now() - self._inicio).total_seconds()
# Contar herramientas usadas
from collections import Counter
conteo = Counter(self._acciones)
reporte = {
"session_id": event.session_id,
"duracion_segundos": duracion,
"total_acciones": len(self._acciones),
"herramientas_usadas": dict(conteo),
"resultado": event.result if hasattr(event, 'result') else "N/A",
"timestamp": datetime.now().isoformat()
}
print(f"\n{'='*50}")
print(f"REPORTE FINAL DE SESIÓN")
print(f"Duración: {duracion:.1f}s")
print(f"Acciones: {len(self._acciones)}")
print(f"Herramientas: {dict(conteo)}")
print('='*50)
# Guardar reporte en archivo
with open(f"/tmp/reporte_{event.session_id[:8]}.json", "w") as f:
json.dump(reporte, f, indent=2, ensure_ascii=False)
return {}
async def main():
reporte = ReporteFinal("[email protected]")
async def hook_log_acciones(event):
reporte.registrar_accion(event.tool_name)
return {}
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher=".*",
hooks=[PreToolUseHook(hook_log_acciones)]
)
],
on_stop=StopHook(reporte.enviar_reporte)
)
async for event in query(
prompt="Crea un módulo Python completo con tests y documentación",
options=options
):
pass
Cleanup de recursos
import asyncio
from claude_code_sdk import ClaudeCodeOptions, StopHook
class GestorRecursos:
def __init__(self):
self._conexiones_db = []
self._archivos_temp = []
self._tareas_fondo = []
def registrar_conexion(self, conn):
self._conexiones_db.append(conn)
def registrar_archivo_temp(self, path: str):
self._archivos_temp.append(path)
async def limpiar_todo(self, event) -> dict:
"""Limpia todos los recursos al terminar la sesión."""
import os
# Cerrar conexiones DB
for conn in self._conexiones_db:
try:
conn.close()
print(f"[CLEANUP] Conexión DB cerrada")
except Exception as e:
print(f"[CLEANUP] Error cerrando conexión: {e}")
# Eliminar archivos temporales
for path in self._archivos_temp:
try:
os.remove(path)
print(f"[CLEANUP] Archivo temporal eliminado: {path}")
except FileNotFoundError:
pass
except Exception as e:
print(f"[CLEANUP] Error eliminando {path}: {e}")
# Cancelar tareas de fondo
for tarea in self._tareas_fondo:
if not tarea.done():
tarea.cancel()
print(f"[CLEANUP] Sesión {event.session_id[:8]} terminada")
return {}
Notification hook — Redirigir mensajes del agente
El agente puede enviar notificaciones informativas mientras trabaja (progreso, advertencias, etc.). El Notification hook intercepta estos mensajes.
Configuración y uso
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, NotificationHook
async def redirigir_a_slack(event) -> dict:
"""Redirige las notificaciones del agente a un canal de Slack."""
mensaje = event.message if hasattr(event, 'message') else str(event)
import httpx
import os
webhook = os.environ.get("SLACK_WEBHOOK_URL")
if webhook:
try:
async with httpx.AsyncClient(timeout=3.0) as client:
await client.post(webhook, json={
"text": f"🤖 Agente: {mensaje}"
})
except Exception:
pass
return {}
async def notificacion_a_websocket(event) -> dict:
"""Envía notificaciones vía WebSocket para actualización en tiempo real."""
import websockets
import json
mensaje_ws = json.dumps({
"tipo": "notificacion_agente",
"mensaje": str(event),
"timestamp": asyncio.get_event_loop().time()
})
try:
async with websockets.connect("ws://localhost:8765") as ws:
await ws.send(mensaje_ws)
except Exception:
pass
return {}
async def main():
options = ClaudeCodeOptions(
notification_hook=NotificationHook(redirigir_a_slack)
)
async for event in query(
prompt="Implementa el módulo de autenticación completo",
options=options
):
pass
Context en hooks — Estado compartido entre hooks
El contexto del hook contiene información sobre la sesión actual que puedes usar para decisiones más inteligentes.
Qué contiene el contexto
@dataclass
class HookContext:
session_id: str # ID único de esta sesión del agente
# Los hooks pueden también acceder al historial via closures
Compartir estado entre hooks con clase
import asyncio
from pathlib import Path
from datetime import datetime
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook
class ContextoHooks:
"""Estado compartido entre múltiples hooks de la misma sesión."""
def __init__(self):
self.archivos_tocados: set = set()
self.herramientas_usadas: list = []
self.inicio_sesion = datetime.now()
self.ultimo_error: str = ""
async def pre_hook(self, event) -> dict:
"""Antes: registrar intención."""
self.herramientas_usadas.append({
"tool": event.tool_name,
"timestamp": datetime.now().isoformat(),
"input_preview": str(event.tool_input)[:100]
})
# Lógica de contexto: si ya tocamos demasiados archivos, advertir
if len(self.archivos_tocados) > 50:
print(f"[CONTEXTO] Advertencia: el agente ha tocado {len(self.archivos_tocados)} archivos")
return {}
async def post_hook(self, event) -> dict:
"""Después: registrar resultado."""
# Registrar archivos tocados
archivo = event.tool_input.get("file_path", "")
if archivo:
self.archivos_tocados.add(archivo)
if event.tool_output.get("isError"):
self.ultimo_error = f"{event.tool_name}: {str(event.tool_output)[:200]}"
return {}
def generar_resumen(self) -> dict:
duracion = (datetime.now() - self.inicio_sesion).total_seconds()
return {
"duracion_segundos": duracion,
"total_operaciones": len(self.herramientas_usadas),
"archivos_unicos": len(self.archivos_tocados),
"ultimo_error": self.ultimo_error
}
# Uso
ctx = ContextoHooks()
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher=".*",
hooks=[
PreToolUseHook(ctx.pre_hook),
PostToolUseHook(ctx.post_hook)
]
)
]
)
Hooks asíncronos
Todos los hooks deben ser funciones async. El SDK los espera (await) antes de continuar. Esto habilita operaciones IO sin bloquear.
Operaciones IO en hooks
import asyncio
import aiofiles
import httpx
from datetime import datetime
async def hook_log_a_archivo_async(event) -> dict:
"""Log asíncrono: no bloquea el hilo principal."""
entrada = f"{datetime.now().isoformat()} | {event.tool_name} | {event.tool_use_id}\n"
async with aiofiles.open("/tmp/agente_audit.log", "a") as f:
await f.write(entrada)
return {}
async def hook_verificar_api_externa(event) -> dict:
"""Verifica en API externa si la acción está permitida."""
tool_name = event.tool_name
tool_input = event.tool_input
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.post(
"https://api.politicas.empresa.com/verificar",
json={
"accion": tool_name,
"input": str(tool_input)[:500],
"usuario": "agente-automatico"
}
)
datos = response.json()
if not datos.get("permitido", True):
return {
"hookSpecificOutput": {
"permissionDecision": "deny",
"permissionDecisionReason": datos.get(
"razon", "Acción no permitida por política empresarial"
)
}
}
except (httpx.TimeoutException, httpx.ConnectError):
# Si la API de políticas no está disponible, permitir (fail-open)
# O cambiar a fail-closed según tu política de seguridad
print(f"[ADVERTENCIA] API de políticas no disponible para {tool_name}")
return {}
Timeout de hooks
import asyncio
async def hook_con_timeout(event) -> dict:
"""Hook con timeout propio para evitar bloquear al agente."""
try:
resultado = await asyncio.wait_for(
mi_logica_async(event),
timeout=5.0 # máximo 5 segundos
)
return resultado
except asyncio.TimeoutError:
print(f"[WARNING] Hook timeout para {event.tool_name}, permitiendo continuar")
return {} # Fail-open: permitir si el hook tarda demasiado
async def mi_logica_async(event) -> dict:
# Lógica que podría tardar...
await asyncio.sleep(0) # Ceder control
return {}
Composición de hooks — Pipeline de validaciones
Múltiples hooks encadenados
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook
# Cada validador retorna (permitido: bool, razon: str)
class ValidadorPipeline:
def __init__(self, *validadores):
self.validadores = validadores
async def __call__(self, event) -> dict:
"""Ejecuta todos los validadores en secuencia. El primero que falla, detiene el pipeline."""
for validador in self.validadores:
resultado = await validador(event)
# Si algún validador retorna deny, detener aquí
if resultado.get("hookSpecificOutput", {}).get("permissionDecision") == "deny":
print(f"[PIPELINE] Bloqueado por: {validador.__name__}")
return resultado
return {}
# Validadores individuales (cada uno hace UNA cosa)
async def validar_no_env(event) -> dict:
archivo = event.tool_input.get("file_path", "")
if ".env" in archivo:
return {"hookSpecificOutput": {"permissionDecision": "deny", "permissionDecisionReason": "No .env"}}
return {}
async def validar_no_produccion(event) -> dict:
archivo = event.tool_input.get("file_path", "")
if "/produccion/" in archivo or "/prod/" in archivo:
return {"hookSpecificOutput": {"permissionDecision": "deny", "permissionDecisionReason": "No producción"}}
return {}
async def validar_tamaño_archivo(event) -> dict:
contenido = event.tool_input.get("content", "")
if len(contenido) > 100_000: # 100KB
return {"hookSpecificOutput": {"permissionDecision": "deny", "permissionDecisionReason": "Archivo muy grande"}}
return {}
async def validar_encoding_utf8(event) -> dict:
contenido = event.tool_input.get("content", "")
try:
contenido.encode("utf-8")
except UnicodeEncodeError:
return {"hookSpecificOutput": {"permissionDecision": "deny", "permissionDecisionReason": "Encoding inválido"}}
return {}
# Combinar todos en un pipeline
pipeline_escritura = ValidadorPipeline(
validar_no_env,
validar_no_produccion,
validar_tamaño_archivo,
validar_encoding_utf8
)
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher="^(Write|Edit|MultiEdit)$",
hooks=[PreToolUseHook(pipeline_escritura)]
)
]
)
Hook manager pattern
from typing import List, Callable, Dict, Any
import asyncio
class HookManager:
"""Gestiona colecciones de hooks con prioridades y condiciones."""
def __init__(self):
self._pre_hooks: List[tuple[int, Callable]] = []
self._post_hooks: List[tuple[int, Callable]] = []
def agregar_pre_hook(self, hook: Callable, prioridad: int = 0):
"""Agrega un pre-hook. Mayor prioridad = se ejecuta primero."""
self._pre_hooks.append((prioridad, hook))
self._pre_hooks.sort(key=lambda x: -x[0])
def agregar_post_hook(self, hook: Callable, prioridad: int = 0):
self._post_hooks.append((prioridad, hook))
self._post_hooks.sort(key=lambda x: -x[0])
async def ejecutar_pre(self, event) -> Dict[str, Any]:
for _, hook in self._pre_hooks:
resultado = await hook(event)
if resultado.get("hookSpecificOutput", {}).get("permissionDecision") == "deny":
return resultado
return {}
async def ejecutar_post(self, event) -> Dict[str, Any]:
for _, hook in self._post_hooks:
resultado = await hook(event)
if resultado:
return resultado
return {}
# Uso del HookManager
manager = HookManager()
manager.agregar_pre_hook(validar_no_env, prioridad=100) # Alta prioridad
manager.agregar_pre_hook(validar_no_produccion, prioridad=90)
manager.agregar_pre_hook(validar_tamaño_archivo, prioridad=50)
manager.agregar_pre_hook(hook_log_todo, prioridad=0) # Baja prioridad
Sistema de permisos con hooks
Implementación completa de permisos empresariales
import asyncio
import json
from enum import Enum
from dataclasses import dataclass, field
from typing import Set, Dict, Optional, List
from pathlib import Path
class Permiso(Enum):
LEER = "leer"
ESCRIBIR = "escribir"
EJECUTAR = "ejecutar"
BORRAR = "borrar"
ADMIN = "admin"
@dataclass
class Politica:
"""Define qué puede y no puede hacer el agente."""
herramientas_permitidas: Set[str] = field(default_factory=set)
herramientas_prohibidas: Set[str] = field(default_factory=set)
paths_escritura_permitidos: List[str] = field(default_factory=list)
paths_lectura_permitidos: List[str] = field(default_factory=list)
comandos_bash_permitidos: List[str] = field(default_factory=list)
max_archivos_por_sesion: int = 100
max_llamadas_bash: int = 50
@classmethod
def para_desarrollador(cls) -> "Politica":
"""Política permisiva para entorno de desarrollo."""
return cls(
herramientas_permitidas={".*"}, # Todas
herramientas_prohibidas={"Computer"}, # Except control de GUI
paths_escritura_permitidos=["/home", "/tmp", "/workspace"],
paths_lectura_permitidos=[".*"],
max_archivos_por_sesion=500
)
@classmethod
def para_produccion(cls) -> "Politica":
"""Política restrictiva para entorno de producción."""
return cls(
herramientas_permitidas={"Read", "Bash"}, # Solo lectura + bash limitado
herramientas_prohibidas={"Write", "Edit", "MultiEdit", "Computer"},
paths_escritura_permitidos=["/tmp"],
paths_lectura_permitidos=["/app", "/var/log"],
comandos_bash_permitidos=["ls", "grep", "cat", "tail", "ps", "df", "free"],
max_archivos_por_sesion=10,
max_llamadas_bash=20
)
class SistemaPermisos:
def __init__(self, politica: Politica):
self.politica = politica
self._archivos_escritos: int = 0
self._llamadas_bash: int = 0
async def verificar(self, event) -> dict:
"""Verifica si la acción está permitida por la política."""
tool_name = event.tool_name
tool_input = event.tool_input
# 1. Verificar herramientas prohibidas
import re
for prohibida in self.politica.herramientas_prohibidas:
if re.match(prohibida, tool_name):
return self._denegar(f"Herramienta '{tool_name}' está prohibida por política")
# 2. Verificar límite de archivos escritos
if tool_name in ("Write", "Edit", "MultiEdit"):
if self._archivos_escritos >= self.politica.max_archivos_por_sesion:
return self._denegar(
f"Límite de {self.politica.max_archivos_por_sesion} "
f"archivos por sesión alcanzado"
)
# Verificar paths permitidos para escritura
archivo = tool_input.get("file_path", "")
if archivo and not self._path_en_lista(archivo, self.politica.paths_escritura_permitidos):
return self._denegar(
f"Path '{archivo}' no está en la lista de escritura permitida"
)
self._archivos_escritos += 1
# 3. Verificar límite de llamadas bash
if tool_name == "Bash":
if self._llamadas_bash >= self.politica.max_llamadas_bash:
return self._denegar(
f"Límite de {self.politica.max_llamadas_bash} "
f"comandos Bash por sesión alcanzado"
)
# Verificar comandos permitidos
comando = tool_input.get("command", "").split()[0] if tool_input.get("command") else ""
if self.politica.comandos_bash_permitidos and comando:
if not any(comando == c for c in self.politica.comandos_bash_permitidos):
return self._denegar(
f"Comando '{comando}' no está en la lista de comandos permitidos"
)
self._llamadas_bash += 1
return {}
def _path_en_lista(self, path: str, lista: List[str]) -> bool:
if not lista:
return True # Si no hay restricciones, todo permitido
for patron in lista:
if path.startswith(patron) or patron == ".*":
return True
return False
def _denegar(self, razon: str) -> dict:
print(f"[PERMISOS] DENEGADO: {razon}")
return {
"hookSpecificOutput": {
"permissionDecision": "deny",
"permissionDecisionReason": razon
}
}
# Uso
async def main():
import os
entorno = os.environ.get("ENTORNO", "desarrollo")
politica = Politica.para_produccion() if entorno == "produccion" else Politica.para_desarrollador()
permisos = SistemaPermisos(politica)
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher=".*",
hooks=[PreToolUseHook(permisos.verificar)]
)
]
)
async for event in query(
prompt="Analiza los logs y genera un reporte",
options=options
):
pass
Hooks para testing
Mock de herramientas en tests
import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook
class MockHerramienta:
"""Intercepta llamadas a herramientas reales y retorna datos mock."""
def __init__(self, tool_name: str, output_mock: dict):
self.tool_name = tool_name
self.output_mock = output_mock
self.llamadas: list = []
async def interceptar(self, event) -> dict:
if event.tool_name == self.tool_name:
self.llamadas.append({
"input": event.tool_input,
"tool_use_id": event.tool_use_id
})
# Override el output con el mock
return {
"hookSpecificOutput": {
"overrideToolUseOutput": self.output_mock
}
}
return {}
# Test que verifica que el agente llama a las herramientas correctas
@pytest.mark.asyncio
async def test_agente_llama_bash():
mock_bash = MockHerramienta(
tool_name="Bash",
output_mock={
"type": "tool_result",
"content": "Tests passed: 42 passed, 0 failed"
}
)
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher="^Bash$",
hooks=[PostToolUseHook(mock_bash.interceptar)]
)
],
max_turns=5
)
async for event in query(
prompt="Ejecuta los tests del proyecto con pytest",
options=options
):
pass
# Verificaciones
assert len(mock_bash.llamadas) >= 1, "El agente debe haber llamado a Bash"
comandos = [c["input"].get("command", "") for c in mock_bash.llamadas]
assert any("pytest" in cmd for cmd in comandos), "Debe haber llamado a pytest"
# Test de dry-run: verificar intenciones sin ejecutar nada
@pytest.mark.asyncio
async def test_dry_run_agente():
"""Verifica qué haría el agente sin ejecutar nada realmente."""
acciones_planeadas = []
async def capturar_sin_ejecutar(event) -> dict:
acciones_planeadas.append({
"tool": event.tool_name,
"input": event.tool_input
})
# Retornar mock en lugar de ejecutar
return {
"hookSpecificOutput": {
"overrideToolUseOutput": {
"type": "tool_result",
"content": "OK (simulado)"
}
}
}
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher=".*",
hooks=[PostToolUseHook(capturar_sin_ejecutar)]
)
],
max_turns=10
)
async for event in query(
prompt="Refactoriza el módulo de autenticación",
options=options
):
pass
print(f"El agente habría ejecutado {len(acciones_planeadas)} acciones:")
for accion in acciones_planeadas:
print(f" - {accion['tool']}: {str(accion['input'])[:60]}")
return acciones_planeadas
TypeScript equivalente para testing:
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
interface AccionCapturada {
tool: string;
input: Record<string, unknown>;
}
async function dryRunAgente(prompt: string): Promise<AccionCapturada[]> {
const acciones: AccionCapturada[] = [];
const capturar = async (event: any) => {
acciones.push({ tool: event.toolName, input: event.toolInput });
return {
hookSpecificOutput: {
overrideToolUseOutput: { type: "tool_result", content: "OK (simulado)" }
}
};
};
const options: ClaudeCodeOptions = {
hooks: [{
matcher: ".*",
hooks: [{ type: "PostToolUse", handler: capturar }]
}],
maxTurns: 10
};
for await (const _ of query({ prompt, options })) { /* consumir */ }
return acciones;
}
// En test:
const acciones = await dryRunAgente("Refactoriza el módulo de auth");
console.assert(acciones.length > 0, "Debe planear al menos una acción");
Hooks de observabilidad
Métricas de uso de herramientas con Prometheus
import asyncio
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook
@dataclass
class MetricasHerramienta:
total_llamadas: int = 0
total_errores: int = 0
latencias_ms: List[float] = field(default_factory=list)
@property
def latencia_promedio_ms(self) -> float:
if not self.latencias_ms:
return 0.0
return sum(self.latencias_ms) / len(self.latencias_ms)
@property
def latencia_p99_ms(self) -> float:
if not self.latencias_ms:
return 0.0
sorted_lats = sorted(self.latencias_ms)
idx = int(len(sorted_lats) * 0.99)
return sorted_lats[min(idx, len(sorted_lats) - 1)]
class ColectorMetricas:
def __init__(self):
self._metricas: Dict[str, MetricasHerramienta] = defaultdict(MetricasHerramienta)
self._tiempos_inicio: Dict[str, float] = {}
async def pre_hook_tiempo(self, event) -> dict:
"""Registra el tiempo de inicio para calcular latencia."""
self._tiempos_inicio[event.tool_use_id] = time.time()
return {}
async def post_hook_metricas(self, event) -> dict:
"""Calcula latencia y registra métricas después de cada tool call."""
tool_name = event.tool_name
metrica = self._metricas[tool_name]
metrica.total_llamadas += 1
if event.tool_output.get("isError"):
metrica.total_errores += 1
# Calcular latencia
inicio = self._tiempos_inicio.pop(event.tool_use_id, None)
if inicio:
latencia_ms = (time.time() - inicio) * 1000
metrica.latencias_ms.append(latencia_ms)
return {}
def generar_reporte(self) -> str:
if not self._metricas:
return "Sin métricas registradas"
lineas = ["=== MÉTRICAS DE HERRAMIENTAS ==="]
for tool, m in sorted(self._metricas.items()):
tasa_error = (m.total_errores / m.total_llamadas * 100) if m.total_llamadas > 0 else 0
lineas.append(
f"{tool:40s} | llamadas: {m.total_llamadas:4d} | "
f"errores: {m.total_errores:3d} ({tasa_error:5.1f}%) | "
f"lat. avg: {m.latencia_promedio_ms:6.1f}ms | "
f"lat. p99: {m.latencia_p99_ms:6.1f}ms"
)
return "\n".join(lineas)
def exportar_prometheus(self) -> str:
"""Genera métricas en formato Prometheus text."""
lineas = []
for tool, m in self._metricas.items():
tool_label = tool.replace(".", "_").replace("-", "_")
lineas.append(f'agent_tool_calls_total{{tool="{tool}"}} {m.total_llamadas}')
lineas.append(f'agent_tool_errors_total{{tool="{tool}"}} {m.total_errores}')
if m.latencia_promedio_ms > 0:
lineas.append(f'agent_tool_latency_ms{{tool="{tool}",quantile="0.5"}} {m.latencia_promedio_ms:.2f}')
lineas.append(f'agent_tool_latency_ms{{tool="{tool}",quantile="0.99"}} {m.latencia_p99_ms:.2f}')
return "\n".join(lineas)
# Uso con servidor HTTP para exponer métricas
async def main():
colector = ColectorMetricas()
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher=".*",
hooks=[
PreToolUseHook(colector.pre_hook_tiempo),
PostToolUseHook(colector.post_hook_metricas)
]
)
],
max_turns=20
)
async for event in query(
prompt="Implementa un módulo Python completo con tests",
options=options
):
pass
print(colector.generar_reporte())
print("\n=== FORMATO PROMETHEUS ===")
print(colector.exportar_prometheus())
asyncio.run(main())
Integración con OpenTelemetry
import asyncio
import time
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook
# Configurar OpenTelemetry
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://jaeger:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agente-claude")
class HooksOpenTelemetry:
def __init__(self):
self._spans: dict = {}
async def pre_hook_span(self, event) -> dict:
"""Inicia un span de tracing por cada tool call."""
span = tracer.start_span(
name=f"tool.{event.tool_name}",
attributes={
"tool.name": event.tool_name,
"tool.use_id": event.tool_use_id,
"session.id": event.session_id,
"tool.input.preview": str(event.tool_input)[:200]
}
)
self._spans[event.tool_use_id] = span
return {}
async def post_hook_span(self, event) -> dict:
"""Finaliza el span con información del resultado."""
span = self._spans.pop(event.tool_use_id, None)
if span:
span.set_attribute("tool.error", event.tool_output.get("isError", False))
span.end()
return {}
otel_hooks = HooksOpenTelemetry()
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher=".*",
hooks=[
PreToolUseHook(otel_hooks.pre_hook_span),
PostToolUseHook(otel_hooks.post_hook_span)
]
)
]
)
Anti-patrones con hooks
Qué NO hacer
# ❌ ANTI-PATRÓN 1: Hook síncrono que bloquea
# El SDK espera funciones async. Un hook síncrono no funciona.
def hook_sincrono_mal(event): # Sin async
time.sleep(2) # Bloquea el event loop
return {}
# ✅ CORRECTO
async def hook_async_bien(event):
await asyncio.sleep(0) # Cede control sin bloquear
return {}
# ❌ ANTI-PATRÓN 2: Modificar demasiado el comportamiento
async def hook_que_cambia_todo(event) -> dict:
# Cambiar TODOS los comandos bash a algo diferente es confuso y difícil de debuggear
if event.tool_name == "Bash":
return {
"hookSpecificOutput": {
"overrideToolUseInput": {
"command": "echo 'interceptado'" # Reemplaza TODO
}
}
}
return {}
# ❌ ANTI-PATRÓN 3: Hook que puede causar ciclo infinito
# Si el hook llama a query() internamente, puede crear recursión
async def hook_recursivo_mal(event) -> dict:
if event.tool_name == "Bash":
# ¡NUNCA hagas esto! Llama a query() dentro de un hook
async for sub_event in query(prompt="verifica el comando", options=options):
pass
return {}
# ❌ ANTI-PATRÓN 4: Ignorar excepciones silenciosamente
async def hook_ignora_errores_mal(event) -> dict:
try:
resultado = await operacion_que_puede_fallar(event)
except:
pass # Silenciar todos los errores es peligroso
return {}
# ✅ CORRECTO: Log el error y continuar
async def hook_maneja_errores_bien(event) -> dict:
try:
resultado = await operacion_que_puede_fallar(event)
return resultado
except SpecificException as e:
print(f"[HOOK ERROR] {event.tool_name}: {e}")
return {} # Fail-open: permitir la acción si el hook falla
except Exception as e:
print(f"[HOOK ERROR CRÍTICO] Tipo inesperado: {type(e).__name__}: {e}")
raise # Re-lanzar errores inesperados para no ocultarlos
Ejemplo completo: Sistema de auditoría
Auditoría completa con SQLite que registra TODAS las acciones
import asyncio
import sqlite3
import json
import time
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, List
from claude_code_sdk import query, ClaudeCodeOptions, HookMatcher, PreToolUseHook, PostToolUseHook, StopHook
DB_AUDITORIA = "/tmp/auditoria_agente.db"
def inicializar_db_auditoria():
conn = sqlite3.connect(DB_AUDITORIA)
conn.execute("""
CREATE TABLE IF NOT EXISTS acciones (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
tool_use_id TEXT NOT NULL,
tool_name TEXT NOT NULL,
tool_input TEXT,
tool_output TEXT,
duracion_ms REAL,
fue_denegado BOOLEAN DEFAULT FALSE,
razon_denegacion TEXT,
timestamp_inicio TEXT NOT NULL,
timestamp_fin TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS sesiones (
session_id TEXT PRIMARY KEY,
inicio TEXT NOT NULL,
fin TEXT,
total_acciones INTEGER DEFAULT 0,
total_denegaciones INTEGER DEFAULT 0,
resultado_final TEXT
)
""")
conn.commit()
return conn
class SistemaAuditoria:
"""Sistema de auditoría completo para el agente."""
def __init__(self):
self._conn = inicializar_db_auditoria()
self._tiempos: dict = {}
self._sesion_iniciada = False
self._session_id: Optional[str] = None
def _registrar_sesion(self, session_id: str):
if not self._sesion_iniciada:
self._session_id = session_id
self._sesion_iniciada = True
self._conn.execute(
"INSERT OR IGNORE INTO sesiones (session_id, inicio) VALUES (?, ?)",
(session_id, datetime.now().isoformat())
)
self._conn.commit()
async def pre_hook(self, event) -> dict:
"""Registra el inicio de cada acción."""
self._registrar_sesion(event.session_id)
self._tiempos[event.tool_use_id] = time.time()
self._conn.execute("""
INSERT INTO acciones
(session_id, tool_use_id, tool_name, tool_input, timestamp_inicio)
VALUES (?, ?, ?, ?, ?)
""", (
event.session_id,
event.tool_use_id,
event.tool_name,
json.dumps(event.tool_input, ensure_ascii=False, default=str)[:2000],
datetime.now().isoformat()
))
self._conn.commit()
return {}
async def post_hook(self, event) -> dict:
"""Actualiza el registro con el resultado y duración."""
inicio = self._tiempos.pop(event.tool_use_id, time.time())
duracion_ms = (time.time() - inicio) * 1000
self._conn.execute("""
UPDATE acciones
SET tool_output = ?,
duracion_ms = ?,
timestamp_fin = ?
WHERE tool_use_id = ?
""", (
json.dumps(event.tool_output, ensure_ascii=False, default=str)[:2000],
duracion_ms,
datetime.now().isoformat(),
event.tool_use_id
))
self._conn.commit()
return {}
async def hook_denegacion(self, resultado_original: dict, event) -> dict:
"""Registra cuando se deniega una acción (para usar como PreToolUse + post-process)."""
permision = resultado_original.get("hookSpecificOutput", {}).get("permissionDecision")
if permision == "deny":
razon = resultado_original.get("hookSpecificOutput", {}).get("permissionDecisionReason", "")
self._conn.execute("""
UPDATE acciones
SET fue_denegado = TRUE, razon_denegacion = ?
WHERE tool_use_id = ?
""", (razon[:500], event.tool_use_id))
self._conn.execute("""
UPDATE sesiones
SET total_denegaciones = total_denegaciones + 1
WHERE session_id = ?
""", (event.session_id,))
self._conn.commit()
return resultado_original
async def stop_hook(self, event) -> dict:
"""Genera reporte final y actualiza la sesión."""
session_id = event.session_id if hasattr(event, 'session_id') else self._session_id
if not session_id:
return {}
# Contar acciones de la sesión
cursor = self._conn.execute(
"SELECT COUNT(*) as total FROM acciones WHERE session_id = ?",
(session_id,)
)
total = cursor.fetchone()[0]
self._conn.execute("""
UPDATE sesiones
SET fin = ?, total_acciones = ?
WHERE session_id = ?
""", (datetime.now().isoformat(), total, session_id))
self._conn.commit()
# Generar reporte
reporte = self._generar_reporte(session_id)
print(reporte)
# Guardar reporte en archivo
ruta_reporte = f"/tmp/reporte_sesion_{session_id[:8]}.txt"
with open(ruta_reporte, "w") as f:
f.write(reporte)
print(f"\nReporte guardado en: {ruta_reporte}")
return {}
def _generar_reporte(self, session_id: str) -> str:
cursor = self._conn.execute("""
SELECT
tool_name,
COUNT(*) as total,
SUM(fue_denegado) as denegados,
AVG(duracion_ms) as lat_avg,
MAX(duracion_ms) as lat_max
FROM acciones
WHERE session_id = ?
GROUP BY tool_name
ORDER BY total DESC
""", (session_id,))
lineas = [
f"\n{'='*70}",
f"REPORTE DE AUDITORÍA",
f"Sesión: {session_id[:16]}",
f"Generado: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"{'='*70}",
f"{'Herramienta':<35} {'Llamadas':>8} {'Denegadas':>9} {'Lat.Avg':>8} {'Lat.Max':>8}",
f"{'-'*70}"
]
for row in cursor.fetchall():
tool, total, denegados, lat_avg, lat_max = row
lineas.append(
f"{tool:<35} {total:>8} {(denegados or 0):>9} "
f"{(lat_avg or 0):>7.1f}ms {(lat_max or 0):>7.1f}ms"
)
lineas.append(f"{'='*70}")
return "\n".join(lineas)
def cerrar(self):
self._conn.close()
# TypeScript equivalente para auditoria simple
EJEMPLO_TYPESCRIPT = """
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import Database from "better-sqlite3";
const db = new Database("/tmp/auditoria.db");
db.exec(`
CREATE TABLE IF NOT EXISTS acciones (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
tool_name TEXT,
tool_input TEXT,
duracion_ms REAL,
timestamp TEXT
)
`);
const tiempos = new Map<string, number>();
const preHook = async (event: any) => {
tiempos.set(event.toolUseId, Date.now());
db.prepare(
"INSERT INTO acciones (session_id, tool_name, tool_input, timestamp) VALUES (?, ?, ?, ?)"
).run(event.sessionId, event.toolName, JSON.stringify(event.toolInput).slice(0, 500), new Date().toISOString());
return {};
};
const postHook = async (event: any) => {
const inicio = tiempos.get(event.toolUseId) ?? Date.now();
const duracion = Date.now() - inicio;
db.prepare(
"UPDATE acciones SET duracion_ms = ? WHERE tool_name = ? ORDER BY id DESC LIMIT 1"
).run(duracion, event.toolName);
tiempos.delete(event.toolUseId);
return {};
};
"""
# Función principal que usa el sistema de auditoría
async def main():
auditoria = SistemaAuditoria()
try:
options = ClaudeCodeOptions(
hooks=[
HookMatcher(
matcher=".*",
hooks=[
PreToolUseHook(auditoria.pre_hook),
PostToolUseHook(auditoria.post_hook)
]
)
],
max_turns=15,
system_prompt="Eres un agente de desarrollo. Implementa lo que se te pide de forma cuidadosa."
)
print("Iniciando agente con auditoría completa...")
async for event in query(
prompt="Crea un script Python que calcule los primeros 20 números de Fibonacci con memoización",
options=options
):
if hasattr(event, 'type') and event.type == 'assistant':
for block in event.message.content:
if hasattr(block, 'text'):
print(block.text, end="", flush=True)
print()
finally:
auditoria.cerrar()
asyncio.run(main())
Resumen del capítulo
mindmap
root((Hooks))
Tipos de Hook
PreToolUse
Validar input
Denegar acción
Transformar input
PostToolUse
Log resultado
Modificar output
Notificar
Stop
Reporte final
Cleanup recursos
Notification
Redirigir mensajes
HookMatcher
Regex patterns
Múltiples matchers
allowed_tools
Casos de Uso
Seguridad
Proteger archivos
Bloquear comandos
Sistema de permisos
Observabilidad
Métricas
OpenTelemetry
Prometheus
Testing
Mocks
Dry-run
Verificación
Auditoría
SQLite log
Reporte final
Composición
Pipeline de validaciones
HookManager
Prioridades
Anti-patrones
Hooks síncronos
Recursión
Silenciar errores
Demasiada latencia