Capítulo 7: Subagentes y Orquestación
Capítulo 7: Subagentes y Orquestación
Los sistemas de un solo agente tienen límites naturales: un contexto finito, una sola línea de ejecución, y dificultad para especializar comportamiento. Los subagentes permiten dividir problemas complejos en tareas manejables, ejecutarlas en paralelo o en secuencia, y combinar resultados de manera confiable.
1. ¿Qué son los subagentes?
Un subagente es una instancia del SDK lanzada por otro agente (el orquestador o coordinador). El orquestador no ejecuta directamente las tareas — las delega a workers especializados y sintetiza sus resultados.
Jerarquía de agentes
graph TD
U[Usuario / Aplicación]
O[Orquestador]
W1[Worker: Investigación]
W2[Worker: Análisis]
W3[Worker: Redacción]
S1[Subworker A]
S2[Subworker B]
U -->|query principal| O
O -->|delega tarea 1| W1
O -->|delega tarea 2| W2
O -->|delega tarea 3| W3
W2 -->|subtarea| S1
W2 -->|subtarea| S2
W1 -->|resultado| O
W2 -->|resultado| O
W3 -->|resultado| O
O -->|respuesta final| U
La herramienta Task vs lanzar un subagente manualmente
El SDK expone dos mecanismos para delegar trabajo:
| Mecanismo | Descripción | Cuándo usar |
|---|---|---|
Task tool (interna) | Claude puede invocar subagentes a través de la herramienta Task cuando el modelo lo decide | Cuando quieres que el modelo decida cuándo delegar |
| Subagente programático | Tu código lanza explícitamente un nuevo query() con su propio ClaudeCodeOptions | Cuando tú controlas cuándo y cómo delegar |
En producción, el patrón más robusto es el subagente programático: tu código orquesta explícitamente las llamadas, tienes control total sobre errores, reintentos y paralelismo.
Diferencia conceptual: Agente vs Sistema multi-agente
graph LR
subgraph "Agente simple"
A1[query] --> A2[herramientas] --> A3[resultado]
end
subgraph "Sistema multi-agente"
B1[orquestador] --> B2[worker 1]
B1 --> B3[worker 2]
B1 --> B4[worker 3]
B2 --> B5[resultado parcial 1]
B3 --> B6[resultado parcial 2]
B4 --> B7[resultado parcial 3]
B5 --> B8[síntesis]
B6 --> B8
B7 --> B8
end
Ventajas de los sistemas multi-agente
Los sistemas multi-agente no son simplemente “más agentes” — resuelven problemas cualitativos que un agente único no puede:
Especialización: Un worker de análisis de seguridad tiene un system prompt y herramientas completamente distintas a uno de redacción. Cada uno puede ser optimizado para su tarea específica.
Paralelismo: Tareas independientes se ejecutan simultáneamente, reduciendo la latencia total del sistema. Analizar 50 archivos en paralelo es 10x más rápido que secuencialmente.
Aislamiento de errores: Si un worker falla, los demás continúan. El orquestador decide si el error es fatal o si puede continuar con resultados parciales.
Mantenibilidad: Cada worker es un componente independiente, testeable en aislamiento. Cambiar la lógica de un worker no afecta a los demás.
Control de contexto: Cada agente tiene su propio contexto. El orquestador decide qué información comparte con cada worker, evitando que el contexto global se llene con información irrelevante.
2. Configuración de subagentes: ClaudeCodeOptions
Cada subagente es una llamada a query() con su propio conjunto de opciones. Puedes controlar herramientas disponibles, modelo, system prompt, y más.
Campos clave de ClaudeCodeOptions
# Python
from claude_code_sdk import query, ClaudeCodeOptions
opciones_worker = ClaudeCodeOptions(
# Modelo a usar para este subagente
model="claude-opus-4-5",
# Herramientas disponibles SOLO para este agente
allowed_tools=["Read", "Glob", "Grep"],
# Herramientas completamente bloqueadas
disallowed_tools=["Bash", "Write", "Edit"],
# System prompt especializado
system_prompt="""Eres un agente de análisis de seguridad.
Solo puedes LEER archivos. Nunca modifiques nada.
Tu output debe ser JSON con campos: vulnerabilidades, severidad, recomendaciones.""",
# Máximo número de turnos (límite de seguridad)
max_turns=10,
# Directorio de trabajo
cwd="/ruta/al/proyecto",
# Variables de entorno disponibles
allowed_env_vars=["PATH", "NODE_ENV"],
)
// TypeScript
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const opcionesWorker: ClaudeCodeOptions = {
model: "claude-opus-4-5",
allowedTools: ["Read", "Glob", "Grep"],
disallowedTools: ["Bash", "Write", "Edit"],
systemPrompt: `Eres un agente de análisis de seguridad.
Solo puedes LEER archivos. Nunca modifiques nada.
Tu output debe ser JSON con campos: vulnerabilidades, severidad, recomendaciones.`,
maxTurns: 10,
cwd: "/ruta/al/proyecto",
};
Campos importantes explicados
allowed_tools: Lista blanca de herramientas. Si está definida, el agente SOLO puede usar esas herramientas. Útil para workers especializados que no deben tener acceso completo.
disallowed_tools: Lista negra. Bloquea herramientas específicas sin importar qué más esté disponible. Útil para agregar restricciones a un conjunto base.
system_prompt: Define la personalidad, rol y restricciones del agente. Un buen system prompt para un subagente debe ser:
- Conciso y específico al rol
- Definir el formato de output esperado
- Indicar qué NO debe hacer
- Incluir criterios de éxito claros
max_turns: Número máximo de vueltas de conversación. Fundamental para evitar agentes que se queden en bucles infinitos. En producción, siempre definir este valor.
cwd: Directorio de trabajo. Cada worker puede operar en un directorio diferente si es necesario.
Estrategia de selección de modelo por worker
No todos los workers necesitan el modelo más potente. Usar el modelo correcto para cada tarea optimiza costos y velocidad:
from claude_code_sdk import query, ClaudeCodeOptions
# Worker de exploración: tarea simple, modelo liviano
opciones_explorer = ClaudeCodeOptions(
model="claude-haiku-4-5", # Más rápido y barato
allowed_tools=["Glob", "Grep"],
max_turns=5,
system_prompt="Lista archivos relevantes. Retorna JSON array de rutas."
)
# Worker de análisis: razonamiento complejo, modelo potente
opciones_analyzer = ClaudeCodeOptions(
model="claude-opus-4-5", # Más capaz para análisis profundo
allowed_tools=["Read", "Grep"],
max_turns=15,
system_prompt="Analiza la arquitectura del código. Detecta problemas de diseño."
)
# Worker de formato: tarea mecánica, modelo liviano
opciones_formatter = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=[], # Sin herramientas — solo transforma texto
max_turns=3,
system_prompt="Formatea el análisis como Markdown estructurado."
)
// TypeScript equivalente
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const opcionesExplorer: ClaudeCodeOptions = {
model: "claude-haiku-4-5",
allowedTools: ["Glob", "Grep"],
maxTurns: 5,
systemPrompt: "Lista archivos relevantes. Retorna JSON array de rutas.",
};
const opcionesAnalyzer: ClaudeCodeOptions = {
model: "claude-opus-4-5",
allowedTools: ["Read", "Grep"],
maxTurns: 15,
systemPrompt: "Analiza la arquitectura del código. Detecta problemas de diseño.",
};
const opcionesFormatter: ClaudeCodeOptions = {
model: "claude-haiku-4-5",
allowedTools: [],
maxTurns: 3,
systemPrompt: "Formatea el análisis como Markdown estructurado.",
};
3. Patrones de orquestación
Existen cuatro patrones principales para organizar sistemas multi-agente:
graph TD
subgraph "Hub-and-Spoke"
H[Hub/Coordinador]
H --> S1[Spoke 1]
H --> S2[Spoke 2]
H --> S3[Spoke 3]
end
subgraph "Pipeline"
P1[Etapa 1] --> P2[Etapa 2] --> P3[Etapa 3]
end
subgraph "Fan-out/Fan-in"
FO[Fan-out]
FO --> FW1[Worker 1]
FO --> FW2[Worker 2]
FO --> FW3[Worker 3]
FW1 --> FI[Fan-in/Aggregator]
FW2 --> FI
FW3 --> FI
end
subgraph "Jerárquico"
JO[Orquestador]
JO --> JM1[Manager 1]
JO --> JM2[Manager 2]
JM1 --> JW1[Worker 1A]
JM1 --> JW2[Worker 1B]
JM2 --> JW3[Worker 2A]
end
Comparación de patrones
| Patrón | Paralelismo | Complejidad | Mejor para |
|---|---|---|---|
| Hub-and-Spoke | Opcional | Media | Workers especializados, tareas independientes |
| Pipeline | No (secuencial) | Baja | Transformaciones encadenadas |
| Fan-out/Fan-in | Máximo | Alta | Análisis de múltiples archivos, scraping |
| Jerárquico | Parcial | Muy Alta | Proyectos grandes con sub-equipos |
Cuándo usar cada patrón
Hub-and-Spoke: Cuando tienes tareas claramente diferenciadas por dominio (búsqueda, análisis, redacción) y el coordinador necesita sintetizar perspectivas diferentes.
Pipeline: Cuando cada paso transforma el output del anterior. Ideal para ETL, procesamiento de datos, o generación de contenido en etapas.
Fan-out/Fan-in: Cuando tienes un conjunto de items del mismo tipo (archivos, URLs, registros) que deben procesarse individualmente y luego consolidarse.
Jerárquico: Para proyectos muy grandes donde diferentes “equipos” de agentes trabajan en dominios separados con sus propios coordinadores.
4. Patrón Hub-and-Spoke
El coordinador (hub) recibe la tarea principal, la descompone, y delega fragmentos a workers especializados (spokes). Cada spoke tiene herramientas y prompts optimizados para su función.
sequenceDiagram
participant App as Aplicación
participant Hub as Coordinador
participant RW as Worker Researcher
participant AW as Worker Analyst
participant WW as Worker Writer
App->>Hub: "Análisis completo del proyecto X"
Hub->>RW: "Encuentra todos los archivos de configuración"
Hub->>AW: "Analiza la arquitectura del código fuente"
RW-->>Hub: Lista de configs encontradas
AW-->>Hub: Análisis de arquitectura
Hub->>WW: "Genera reporte con estos hallazgos"
WW-->>Hub: Reporte en Markdown
Hub-->>App: Reporte final consolidado
Implementación completa en Python
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from dataclasses import dataclass
from typing import Optional
@dataclass
class ResultadoWorker:
nombre: str
resultado: str
exito: bool
error: Optional[str] = None
async def worker_investigacion(directorio: str) -> ResultadoWorker:
"""Worker especializado en explorar la estructura del proyecto."""
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5", # Modelo liviano para exploración
allowed_tools=["Read", "Glob", "Grep"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Eres un agente de exploración de código.
Tu tarea es catalogar la estructura del proyecto.
Retorna SOLO un JSON con este formato exacto:
{
"archivos_principales": ["ruta1", "ruta2"],
"lenguajes": ["Python", "TypeScript"],
"frameworks": ["FastAPI", "React"],
"configuraciones": ["package.json", "pyproject.toml"],
"tests": "sí/no",
"documentacion": "sí/no"
}""",
max_turns=8,
cwd=directorio,
)
try:
resultado_texto = ""
async for mensaje in query(
prompt=f"Explora el proyecto en {directorio} y retorna el JSON de estructura.",
options=opciones
):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado_texto = bloque.text
return ResultadoWorker(
nombre="investigacion",
resultado=resultado_texto,
exito=True
)
except Exception as e:
return ResultadoWorker(
nombre="investigacion",
resultado="",
exito=False,
error=str(e)
)
async def worker_analisis(directorio: str, contexto: str) -> ResultadoWorker:
"""Worker especializado en análisis de calidad de código."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5", # Modelo más potente para análisis
allowed_tools=["Read", "Glob", "Grep"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Eres un experto en análisis de calidad de código.
Analiza el código fuente y detecta:
- Problemas de arquitectura
- Code smells
- Deuda técnica
- Violaciones de SOLID
- Riesgos de seguridad obvios
Retorna un análisis estructurado en Markdown con secciones claras.""",
max_turns=15,
cwd=directorio,
)
try:
resultado_texto = ""
async for mensaje in query(
prompt=f"Basado en esta estructura: {contexto}\n\nAnaliza la calidad del código.",
options=opciones
):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado_texto = bloque.text
return ResultadoWorker(
nombre="analisis",
resultado=resultado_texto,
exito=True
)
except Exception as e:
return ResultadoWorker(
nombre="analisis",
resultado="",
exito=False,
error=str(e)
)
async def worker_redactor(hallazgos: dict) -> ResultadoWorker:
"""Worker especializado en generar reportes ejecutivos."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=[], # Sin herramientas — solo genera texto
system_prompt="""Eres un experto en comunicación técnica.
Conviertes análisis técnicos en reportes claros y accionables.
El reporte debe tener:
1. Resumen ejecutivo (3-5 oraciones)
2. Hallazgos principales (lista priorizada)
3. Recomendaciones inmediatas
4. Recomendaciones a largo plazo
5. Métricas de salud del proyecto
Usa Markdown con encabezados, listas y tablas donde corresponda.""",
max_turns=5,
)
prompt = f"""Genera un reporte ejecutivo basado en estos hallazgos:
## Estructura del Proyecto
{hallazgos.get('investigacion', 'No disponible')}
## Análisis de Calidad
{hallazgos.get('analisis', 'No disponible')}
"""
try:
resultado_texto = ""
async for mensaje in query(prompt=prompt, options=opciones):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado_texto = bloque.text
return ResultadoWorker(
nombre="redactor",
resultado=resultado_texto,
exito=True
)
except Exception as e:
return ResultadoWorker(
nombre="redactor",
resultado="",
exito=False,
error=str(e)
)
async def coordinador_hub(directorio: str) -> str:
"""
Coordinador principal que orquesta todos los workers.
Patrón: Hub-and-Spoke con ejecución parcialmente paralela.
"""
print(f"[Hub] Iniciando análisis de: {directorio}")
# FASE 1: Investigación (primer paso, provee contexto para análisis)
print("[Hub] Lanzando worker de investigación...")
res_investigacion = await worker_investigacion(directorio)
if not res_investigacion.exito:
return f"Error en investigación: {res_investigacion.error}"
print(f"[Hub] Investigación completada. Lanzando análisis...")
# FASE 2: Análisis usa el contexto de investigación
res_analisis = await worker_analisis(directorio, res_investigacion.resultado)
if not res_analisis.exito:
print(f"[Hub] Advertencia: análisis falló: {res_analisis.error}")
res_analisis.resultado = "Análisis no disponible"
# FASE 3: Redacción consolida todo
print("[Hub] Generando reporte final...")
hallazgos = {
"investigacion": res_investigacion.resultado,
"analisis": res_analisis.resultado,
}
res_reporte = await worker_redactor(hallazgos)
if not res_reporte.exito:
return f"Error generando reporte: {res_reporte.error}"
print("[Hub] Análisis completo!")
return res_reporte.resultado
# Uso
if __name__ == "__main__":
reporte = asyncio.run(coordinador_hub("/home/usuario/mi-proyecto"))
print(reporte)
Implementación en TypeScript
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
interface WorkerResult {
name: string;
result: string;
success: boolean;
error?: string;
}
async function workerInvestigacion(directorio: string): Promise<WorkerResult> {
const opciones: ClaudeCodeOptions = {
model: "claude-haiku-4-5",
allowedTools: ["Read", "Glob", "Grep"],
disallowedTools: ["Bash", "Write", "Edit"],
systemPrompt: `Eres un agente de exploración de código.
Cataloga la estructura del proyecto en JSON con campos:
archivos_principales, lenguajes, frameworks, configuraciones, tests, documentacion.`,
maxTurns: 8,
cwd: directorio,
};
try {
let resultado = "";
for await (const mensaje of query({
prompt: `Explora el proyecto en ${directorio}`,
options: opciones,
})) {
if (mensaje.type === "assistant" && mensaje.message.content) {
for (const bloque of mensaje.message.content) {
if (bloque.type === "text") {
resultado = bloque.text;
}
}
}
}
return { name: "investigacion", result: resultado, success: true };
} catch (e) {
return {
name: "investigacion",
result: "",
success: false,
error: String(e),
};
}
}
async function workerAnalisis(
directorio: string,
contexto: string
): Promise<WorkerResult> {
const opciones: ClaudeCodeOptions = {
model: "claude-opus-4-5",
allowedTools: ["Read", "Glob", "Grep"],
disallowedTools: ["Bash", "Write", "Edit"],
systemPrompt: `Eres un experto en análisis de calidad de código.
Detecta problemas de arquitectura, code smells, deuda técnica y riesgos de seguridad.
Retorna análisis estructurado en Markdown.`,
maxTurns: 15,
cwd: directorio,
};
try {
let resultado = "";
for await (const mensaje of query({
prompt: `Basado en esta estructura:\n${contexto}\n\nAnaliza la calidad del código.`,
options: opciones,
})) {
if (mensaje.type === "assistant" && mensaje.message.content) {
for (const bloque of mensaje.message.content) {
if (bloque.type === "text") {
resultado = bloque.text;
}
}
}
}
return { name: "analisis", result: resultado, success: true };
} catch (e) {
return { name: "analisis", result: "", success: false, error: String(e) };
}
}
async function coordinadorHub(directorio: string): Promise<string> {
console.log(`[Hub] Iniciando análisis de: ${directorio}`);
// Paso 1: Investigación
const resInvestigacion = await workerInvestigacion(directorio);
if (!resInvestigacion.success) {
return `Error: ${resInvestigacion.error}`;
}
// Paso 2: Análisis con contexto de investigación
const resAnalisis = await workerAnalisis(
directorio,
resInvestigacion.result
);
const contextoAnalisis = resAnalisis.success
? resAnalisis.result
: "Análisis no disponible";
// Paso 3: Redacción final
const opciones: ClaudeCodeOptions = {
model: "claude-opus-4-5",
allowedTools: [],
systemPrompt: "Genera reporte ejecutivo en Markdown con hallazgos y recomendaciones.",
maxTurns: 5,
};
let reporte = "";
for await (const mensaje of query({
prompt: `Genera reporte con:\n## Estructura\n${resInvestigacion.result}\n## Análisis\n${contextoAnalisis}`,
options: opciones,
})) {
if (mensaje.type === "assistant" && mensaje.message.content) {
for (const bloque of mensaje.message.content) {
if (bloque.type === "text") reporte = bloque.text;
}
}
}
return reporte;
}
// Uso
coordinadorHub("/home/usuario/mi-proyecto").then(console.log);
5. Patrón Pipeline
En el patrón pipeline, cada agente transforma el output del agente anterior. El output de uno es el input del siguiente. Ideal para procesos de transformación lineales.
graph LR
I[Input Raw] --> A1[Agente 1: Extractor]
A1 -->|datos extraídos| A2[Agente 2: Normalizador]
A2 -->|datos normalizados| A3[Agente 3: Enriquecedor]
A3 -->|datos enriquecidos| A4[Agente 4: Formateador]
A4 --> O[Output Final]
Implementación Python: Pipeline de análisis de logs
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from typing import Any
async def ejecutar_etapa_pipeline(
nombre_etapa: str,
prompt: str,
input_datos: str,
system_prompt: str,
model: str = "claude-haiku-4-5",
max_turns: int = 5
) -> str:
"""Ejecuta una etapa del pipeline y retorna su output."""
opciones = ClaudeCodeOptions(
model=model,
allowed_tools=[], # Pipeline de texto puro — sin herramientas
system_prompt=system_prompt,
max_turns=max_turns,
)
prompt_completo = f"{prompt}\n\n## Input:\n{input_datos}"
resultado = ""
async for mensaje in query(prompt=prompt_completo, options=opciones):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado = bloque.text
print(f"[Pipeline] Etapa '{nombre_etapa}' completada ({len(resultado)} chars)")
return resultado
async def pipeline_analisis_logs(logs_raw: str) -> dict:
"""
Pipeline de 4 etapas para analizar logs de aplicación.
Cada etapa transforma el output de la anterior.
"""
# Etapa 1: Extracción — identifica eventos importantes
logs_estructurados = await ejecutar_etapa_pipeline(
nombre_etapa="extraccion",
prompt="Extrae y estructura los eventos importantes de estos logs.",
input_datos=logs_raw,
system_prompt="""Eres un parser de logs. Extrae eventos en JSON:
[{"timestamp": "...", "nivel": "ERROR|WARN|INFO", "mensaje": "...", "servicio": "..."}]
Incluye SOLO eventos ERROR y WARN. Retorna SOLO el JSON array.""",
model="claude-haiku-4-5"
)
# Etapa 2: Clasificación — agrupa por tipo de problema
logs_clasificados = await ejecutar_etapa_pipeline(
nombre_etapa="clasificacion",
prompt="Clasifica estos eventos por categoría de problema.",
input_datos=logs_estructurados,
system_prompt="""Eres un clasificador de incidentes.
Agrupa los eventos en categorías: database, network, auth, performance, unknown.
Retorna JSON: {"categoria": [lista_de_eventos]}""",
model="claude-haiku-4-5"
)
# Etapa 3: Análisis de causa raíz
analisis_causas = await ejecutar_etapa_pipeline(
nombre_etapa="analisis_causas",
prompt="Identifica causas raíz y patrones en estos incidentes clasificados.",
input_datos=logs_clasificados,
system_prompt="""Eres un experto en análisis de incidentes de producción.
Identifica:
1. Causa raíz probable de cada categoría
2. Patrones temporales (¿se repite cada X tiempo?)
3. Correlaciones entre categorías
4. Impacto estimado en usuarios
Sé específico y accionable.""",
model="claude-opus-4-5",
max_turns=8
)
# Etapa 4: Generación de recomendaciones
recomendaciones = await ejecutar_etapa_pipeline(
nombre_etapa="recomendaciones",
prompt="Genera recomendaciones de remediación para este análisis.",
input_datos=analisis_causas,
system_prompt="""Eres un arquitecto de sistemas senior.
Genera recomendaciones priorizadas:
- P0: Crítico, acción inmediata (menos de 1 hora)
- P1: Alto, acción hoy (menos de 24 horas)
- P2: Medio, planificar esta semana
- P3: Bajo, backlog técnico
Para cada recomendación: descripción, pasos concretos, esfuerzo estimado (S/M/L).""",
model="claude-opus-4-5",
max_turns=8
)
return {
"logs_estructurados": logs_estructurados,
"clasificacion": logs_clasificados,
"analisis_causas": analisis_causas,
"recomendaciones": recomendaciones,
}
# Uso del pipeline
async def main():
logs = """
2026-03-23 10:15:32 ERROR [database] Connection timeout after 30s: host=db-primary
2026-03-23 10:15:45 ERROR [database] Max connections reached: pool_size=100
2026-03-23 10:16:01 WARN [api] Response time > 5000ms: endpoint=/api/users
2026-03-23 10:16:15 ERROR [auth] JWT validation failed: token expired
2026-03-23 10:16:20 ERROR [database] Connection timeout after 30s: host=db-primary
"""
resultado = await pipeline_analisis_logs(logs)
print("=== RECOMENDACIONES FINALES ===")
print(resultado["recomendaciones"])
asyncio.run(main())
Pipeline en TypeScript: transformación de datos
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
interface EtapaPipeline {
nombre: string;
systemPrompt: string;
model?: string;
maxTurns?: number;
}
async function ejecutarEtapa(
etapa: EtapaPipeline,
inputDatos: string
): Promise<string> {
const opciones: ClaudeCodeOptions = {
model: etapa.model ?? "claude-haiku-4-5",
allowedTools: [],
systemPrompt: etapa.systemPrompt,
maxTurns: etapa.maxTurns ?? 5,
};
let resultado = "";
for await (const mensaje of query({
prompt: `Procesa este input:\n\n${inputDatos}`,
options: opciones,
})) {
if (mensaje.type === "assistant" && mensaje.message.content) {
for (const bloque of mensaje.message.content) {
if (bloque.type === "text") resultado = bloque.text;
}
}
}
console.log(`[Pipeline] Etapa '${etapa.nombre}' completada (${resultado.length} chars)`);
return resultado;
}
async function ejecutarPipeline(
input: string,
etapas: EtapaPipeline[]
): Promise<string> {
let datosActuales = input;
for (const etapa of etapas) {
datosActuales = await ejecutarEtapa(etapa, datosActuales);
}
return datosActuales;
}
// Configuración del pipeline de análisis de código
const pipelineRevisionCodigo: EtapaPipeline[] = [
{
nombre: "extraccion",
systemPrompt: "Extrae las funciones y clases del código. Retorna lista estructurada.",
model: "claude-haiku-4-5",
maxTurns: 3,
},
{
nombre: "analisis",
systemPrompt: "Analiza problemas de calidad en las funciones extraídas. Detecta code smells.",
model: "claude-opus-4-5",
maxTurns: 8,
},
{
nombre: "sugerencias",
systemPrompt: "Genera sugerencias de refactoring específicas para los problemas detectados.",
model: "claude-opus-4-5",
maxTurns: 8,
},
{
nombre: "reporte",
systemPrompt: "Formatea el análisis como reporte Markdown con secciones claras.",
model: "claude-haiku-4-5",
maxTurns: 3,
},
];
// Uso
const codigoAAnalizar = `
def procesar(d, f, x):
r = []
for i in d:
if f(i):
r.append(x(i))
return r
`;
ejecutarPipeline(codigoAAnalizar, pipelineRevisionCodigo).then(console.log);
6. Patrón Fan-out / Fan-in
El patrón fan-out lanza múltiples workers en paralelo para procesar diferentes piezas del mismo problema, luego un fan-in consolida todos los resultados.
graph TD
T[Tarea: Analizar 50 archivos]
T --> B[Batch Splitter]
B --> W1[Worker: archivos 1-10]
B --> W2[Worker: archivos 11-20]
B --> W3[Worker: archivos 21-30]
B --> W4[Worker: archivos 31-40]
B --> W5[Worker: archivos 41-50]
W1 --> AG[Aggregator / Fan-in]
W2 --> AG
W3 --> AG
W4 --> AG
W5 --> AG
AG --> R[Reporte consolidado]
Implementación Python: Análisis paralelo de repositorio
import asyncio
import json
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
from typing import List
async def analizar_archivo(ruta: str) -> dict:
"""Analiza un archivo individual buscando problemas."""
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5", # Haiku para velocidad y costo
allowed_tools=["Read"],
disallowed_tools=["Bash", "Write", "Edit", "Glob", "Grep"],
system_prompt="""Eres un revisor de código. Analiza el archivo y retorna JSON:
{
"archivo": "ruta",
"problemas": [{"tipo": "bug|style|security|performance", "linea": N, "descripcion": "..."}],
"puntuacion": 0-10,
"resumen": "una oración"
}
Si el archivo está bien, retorna problemas: []""",
max_turns=3, # Límite bajo — es una tarea simple
)
try:
resultado = ""
async for mensaje in query(
prompt=f"Analiza este archivo: {ruta}",
options=opciones
):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado = bloque.text
# Intentar parsear el JSON del resultado
try:
inicio = resultado.find('{')
fin = resultado.rfind('}') + 1
if inicio >= 0 and fin > inicio:
return json.loads(resultado[inicio:fin])
except json.JSONDecodeError:
pass
return {
"archivo": ruta,
"problemas": [],
"puntuacion": 5,
"resumen": resultado[:100]
}
except Exception as e:
return {
"archivo": ruta,
"error": str(e),
"problemas": [],
"puntuacion": 0
}
async def analizar_batch(archivos: List[str]) -> List[dict]:
"""Analiza un batch de archivos en paralelo (fan-out dentro del batch)."""
tareas = [analizar_archivo(arch) for arch in archivos]
resultados = await asyncio.gather(*tareas, return_exceptions=True)
validos = []
for r in resultados:
if isinstance(r, Exception):
print(f"[Fan-out] Error en batch: {r}")
else:
validos.append(r)
return validos
async def fan_out_analisis(directorio: str, tamano_batch: int = 5) -> dict:
"""
Ejecuta análisis de todos los archivos Python en paralelo.
Fan-out: divide en batches paralelos.
Fan-in: consolida resultados.
"""
# Encontrar todos los archivos Python
archivos = [
str(p) for p in Path(directorio).glob("**/*.py")
if "__pycache__" not in str(p)
]
print(f"[Fan-out] Analizando {len(archivos)} archivos en batches de {tamano_batch}")
# Dividir en batches
batches = [
archivos[i:i+tamano_batch]
for i in range(0, len(archivos), tamano_batch)
]
# Fan-out: ejecutar todos los batches en paralelo
tareas_batch = [analizar_batch(batch) for batch in batches]
resultados_batches = await asyncio.gather(*tareas_batch)
# Fan-in: consolidar todos los resultados
todos_resultados = []
for batch_resultado in resultados_batches:
todos_resultados.extend(batch_resultado)
# Calcular métricas agregadas
total_problemas = sum(len(r.get("problemas", [])) for r in todos_resultados)
archivos_con_problemas = [r for r in todos_resultados if r.get("problemas")]
puntuacion_promedio = (
sum(r.get("puntuacion", 5) for r in todos_resultados) / len(todos_resultados)
if todos_resultados else 0
)
# Clasificar problemas por tipo
por_tipo: dict = {"bug": [], "security": [], "performance": [], "style": []}
for resultado in todos_resultados:
for problema in resultado.get("problemas", []):
tipo = problema.get("tipo", "style")
if tipo in por_tipo:
por_tipo[tipo].append({
"archivo": resultado["archivo"],
**problema
})
return {
"total_archivos": len(archivos),
"archivos_analizados": len(todos_resultados),
"total_problemas": total_problemas,
"puntuacion_promedio": round(puntuacion_promedio, 1),
"archivos_con_problemas": len(archivos_con_problemas),
"problemas_por_tipo": {k: len(v) for k, v in por_tipo.items()},
"problemas_criticos": por_tipo["bug"] + por_tipo["security"],
"detalles": todos_resultados,
}
# Uso
async def main():
resultado = await fan_out_analisis("/mi/proyecto", tamano_batch=5)
print(f"Análisis completado:")
print(f" Archivos: {resultado['total_archivos']}")
print(f" Puntuación: {resultado['puntuacion_promedio']}/10")
print(f" Problemas críticos: {len(resultado['problemas_criticos'])}")
asyncio.run(main())
Fan-out con semáforo para controlar concurrencia
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from typing import Callable, Any
async def fan_out_controlado(
items: list,
procesador: Callable,
max_concurrente: int = 3
) -> list:
"""
Fan-out con límite de concurrencia.
Evita sobrecargar la API o el sistema de archivos.
"""
semaforo = asyncio.Semaphore(max_concurrente)
async def procesar_con_semaforo(item: Any):
async with semaforo:
return await procesador(item)
tareas = [procesar_con_semaforo(item) for item in items]
return await asyncio.gather(*tareas, return_exceptions=True)
# Uso
async def main():
archivos = ["archivo1.py", "archivo2.py", "archivo3.py", "archivo4.py", "archivo5.py"]
# Máximo 3 agentes concurrentes para no sobrecargar la API
resultados = await fan_out_controlado(
archivos,
analizar_archivo,
max_concurrente=3
)
print(f"Procesados: {len(resultados)} archivos")
asyncio.run(main())
Fan-out en TypeScript con Promise.allSettled
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
interface ResultadoArchivo {
archivo: string;
problemas: Array<{ tipo: string; linea: number; descripcion: string }>;
puntuacion: number;
resumen: string;
}
async function analizarArchivo(ruta: string): Promise<ResultadoArchivo> {
const opciones: ClaudeCodeOptions = {
model: "claude-haiku-4-5",
allowedTools: ["Read"],
disallowedTools: ["Bash", "Write", "Edit"],
systemPrompt: `Analiza el archivo y retorna JSON:
{
"archivo": "ruta",
"problemas": [{"tipo": "bug|style|security|performance", "linea": N, "descripcion": "..."}],
"puntuacion": 0-10,
"resumen": "una oración"
}`,
maxTurns: 3,
};
let resultado = "";
for await (const mensaje of query({
prompt: `Analiza: ${ruta}`,
options: opciones,
})) {
if (mensaje.type === "assistant" && mensaje.message.content) {
for (const bloque of mensaje.message.content) {
if (bloque.type === "text") resultado = bloque.text;
}
}
}
try {
const inicio = resultado.indexOf("{");
const fin = resultado.lastIndexOf("}") + 1;
if (inicio >= 0) return JSON.parse(resultado.slice(inicio, fin));
} catch {}
return { archivo: ruta, problemas: [], puntuacion: 5, resumen: resultado };
}
async function fanOutAnalisis(archivos: string[]): Promise<{
exitosos: ResultadoArchivo[];
fallidos: string[];
}> {
// Promise.allSettled nunca rechaza — maneja errores individuales
const resultados = await Promise.allSettled(
archivos.map((archivo) => analizarArchivo(archivo))
);
const exitosos: ResultadoArchivo[] = [];
const fallidos: string[] = [];
resultados.forEach((resultado, index) => {
if (resultado.status === "fulfilled") {
exitosos.push(resultado.value);
} else {
fallidos.push(archivos[index]);
console.error(`Error en ${archivos[index]}:`, resultado.reason);
}
});
return { exitosos, fallidos };
}
// Uso
const archivos = ["src/auth.ts", "src/db.ts", "src/api.ts"];
fanOutAnalisis(archivos).then(({ exitosos, fallidos }) => {
console.log(`Exitosos: ${exitosos.length}, Fallidos: ${fallidos.length}`);
});
7. Subagentes con aislamiento: worktrees Git
Cuando múltiples agentes trabajan sobre el mismo repositorio Git simultáneamente, pueden entrar en conflicto. El parámetro de aislamiento via worktree crea una copia del repositorio en rama separada para cada agente.
graph TD
R[Repositorio Git Principal]
R --> WT1[Worktree 1: rama agent-fix-auth]
R --> WT2[Worktree 2: rama agent-refactor-db]
R --> WT3[Worktree 3: rama agent-add-tests]
WT1 --> A1[Agente 1: Arregla autenticación]
WT2 --> A2[Agente 2: Refactoriza base de datos]
WT3 --> A3[Agente 3: Agrega tests]
A1 --> PR1[Pull Request 1]
A2 --> PR2[Pull Request 2]
A3 --> PR3[Pull Request 3]
Cómo funciona el worktree
Un Git worktree permite tener múltiples checkouts del mismo repositorio en diferentes directorios, cada uno en una rama diferente. Los archivos son independientes pero comparten el objeto de base de datos Git.
import asyncio
import subprocess
import tempfile
import os
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
class WorktreeManager:
"""Gestiona la creación y limpieza de worktrees Git."""
def __init__(self, repo_path: str):
self.repo_path = Path(repo_path)
def crear_worktree(self, rama: str) -> str:
"""Crea un worktree en un directorio temporal."""
worktree_dir = tempfile.mkdtemp(prefix=f"agent-{rama}-")
# Crear nueva rama desde HEAD
subprocess.run(
["git", "worktree", "add", "-b", rama, worktree_dir],
cwd=self.repo_path,
check=True,
capture_output=True
)
return worktree_dir
def limpiar_worktree(self, worktree_dir: str, rama: str):
"""Elimina el worktree y su rama."""
subprocess.run(
["git", "worktree", "remove", "--force", worktree_dir],
cwd=self.repo_path,
capture_output=True
)
subprocess.run(
["git", "branch", "-D", rama],
cwd=self.repo_path,
capture_output=True
)
async def agente_en_worktree_aislado(
repo_path: str,
tarea: str,
rama: str
) -> dict:
"""Ejecuta un agente en un worktree Git completamente aislado."""
manager = WorktreeManager(repo_path)
worktree_dir = None
try:
# Crear worktree aislado
worktree_dir = manager.crear_worktree(rama)
print(f"[Worktree] Creado worktree en: {worktree_dir} (rama: {rama})")
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"],
system_prompt=f"""Trabajas en una rama aislada: {rama}.
Puedes hacer todos los cambios necesarios.
Al finalizar, ejecuta: git add -A && git commit -m "feat: {tarea[:50]}"
No hagas push — solo commit local.""",
max_turns=20,
cwd=worktree_dir,
)
resultado = ""
async for mensaje in query(prompt=tarea, options=opciones):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado = bloque.text
return {
"rama": rama,
"worktree_dir": worktree_dir,
"resultado": resultado,
"exito": True
}
except Exception as e:
return {
"rama": rama,
"worktree_dir": worktree_dir,
"resultado": "",
"exito": False,
"error": str(e)
}
# Nota: no limpiar aquí — el caller decide si hace merge o descarta
async def agentes_paralelos_aislados(
repo_path: str,
tareas: list
) -> list:
"""Lanza múltiples agentes en worktrees aislados simultáneamente."""
coroutines = [
agente_en_worktree_aislado(
repo_path,
tarea["descripcion"],
tarea["rama"]
)
for tarea in tareas
]
resultados = await asyncio.gather(*coroutines, return_exceptions=True)
# Filtrar excepciones
return [
r if not isinstance(r, Exception) else {"error": str(r), "exito": False}
for r in resultados
]
# Uso
async def main():
tareas = [
{
"descripcion": "Arregla el bug de autenticación en auth/middleware.py",
"rama": "fix/auth-bug"
},
{
"descripcion": "Refactoriza la capa de base de datos para usar Repository Pattern",
"rama": "refactor/db-layer"
},
{
"descripcion": "Agrega tests unitarios para el módulo de pagos",
"rama": "feat/payment-tests"
},
]
resultados = await agentes_paralelos_aislados("/mi/repo", tareas)
for r in resultados:
if r.get("exito"):
print(f"[OK] Rama {r['rama']}: cambios listos para review")
else:
print(f"[Error] {r.get('rama', 'desconocida')}: {r.get('error')}")
asyncio.run(main())
8. Comunicación entre agentes
Los subagentes no se comunican directamente entre sí — toda comunicación pasa por el orquestador. Este patrón evita acoplamientos directos y hace el sistema más fácil de depurar.
sequenceDiagram
participant O as Orquestador
participant W1 as Worker 1
participant W2 as Worker 2
participant Store as Estado Compartido
O->>W1: Tarea A con contexto inicial
W1-->>O: Resultado A
O->>Store: Guardar resultado A
O->>W2: Tarea B con resultado A como contexto
W2-->>O: Resultado B
O->>Store: Guardar resultado B
Note over O: El orquestador decide qué<br/>información comparte con quién
Patrón: Estado compartido en memoria
from dataclasses import dataclass, field
from typing import Any, Dict
@dataclass
class EstadoOrquestador:
"""Estado compartido gestionado por el orquestador."""
contexto: Dict[str, Any] = field(default_factory=dict)
resultados: Dict[str, str] = field(default_factory=dict)
errores: list = field(default_factory=list)
def agregar_resultado(self, worker: str, resultado: str):
self.resultados[worker] = resultado
def obtener_contexto_para(self, worker: str) -> str:
"""Construye el contexto relevante para un worker específico."""
partes = []
if worker == "analisis" and "investigacion" in self.resultados:
partes.append(f"Estructura del proyecto:\n{self.resultados['investigacion']}")
elif worker == "redactor":
for k, v in self.resultados.items():
partes.append(f"## {k.title()}\n{v}")
return "\n\n".join(partes)
def hay_errores_criticos(self) -> bool:
"""Determina si algún error impide continuar."""
return any(e.get("critico", False) for e in self.errores)
async def orquestador_con_estado(directorio: str) -> str:
"""Orquestador que mantiene estado explícito entre workers."""
estado = EstadoOrquestador()
# Worker 1 con contexto mínimo
res1 = await worker_investigacion(directorio)
if res1.exito:
estado.agregar_resultado("investigacion", res1.resultado)
else:
estado.errores.append({"worker": "investigacion", "error": res1.error, "critico": True})
if estado.hay_errores_criticos():
return "Pipeline abortado por error crítico"
# Worker 2 recibe contexto del worker 1
contexto_para_analisis = estado.obtener_contexto_para("analisis")
res2 = await worker_analisis(directorio, contexto_para_analisis)
if res2.exito:
estado.agregar_resultado("analisis", res2.resultado)
else:
# Error no crítico — continuar con lo que tenemos
estado.errores.append({"worker": "analisis", "error": res2.error, "critico": False})
estado.agregar_resultado("analisis", "Análisis parcial no disponible")
# Worker final recibe todo el contexto
res3 = await worker_redactor(estado.resultados)
return res3.resultado if res3.exito else "Error generando reporte final"
Patrón: Estado compartido con archivo temporal
Para sistemas más complejos donde los workers necesitan acceder a resultados de múltiples etapas previas, un archivo JSON temporal puede servir como “memoria” compartida:
import asyncio
import json
import tempfile
import os
from claude_code_sdk import query, ClaudeCodeOptions
class MemoriaCompartida:
"""Archivo JSON como estado compartido entre workers."""
def __init__(self):
self._archivo = tempfile.mktemp(suffix=".json")
self._guardar({})
def _guardar(self, datos: dict):
with open(self._archivo, 'w', encoding='utf-8') as f:
json.dump(datos, f, ensure_ascii=False, indent=2)
def _cargar(self) -> dict:
try:
with open(self._archivo, encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return {}
def escribir(self, clave: str, valor: Any):
datos = self._cargar()
datos[clave] = valor
self._guardar(datos)
def leer(self, clave: str, default: Any = None) -> Any:
return self._cargar().get(clave, default)
def limpiar(self):
if os.path.exists(self._archivo):
os.remove(self._archivo)
# Uso en el orquestador
memoria = MemoriaCompartida()
async def worker_que_lee_memoria(clave_input: str, clave_output: str) -> str:
"""Worker que lee contexto de la memoria compartida."""
contexto = memoria.leer(clave_input, "Sin contexto previo")
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=[],
max_turns=5,
system_prompt="Procesa el input y retorna resultado estructurado."
)
resultado = ""
async for m in query(
prompt=f"Procesa esto:\n{contexto}",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
memoria.escribir(clave_output, resultado)
return resultado
9. Manejo de errores en subagentes
Los errores en subagentes son inevitables en producción. El sistema debe manejarlos graciosamente: reintentar, usar fallbacks, o degradar funcionalidad.
flowchart TD
T[Tarea al Worker]
T --> E{¿Error?}
E -->|No| R[Resultado exitoso]
E -->|Sí| RE{¿Reintentable?}
RE -->|No — error fatal| F[Fallback / resultado parcial]
RE -->|Sí| RT{¿Intentos menor que max?}
RT -->|Sí| D[Esperar backoff exponencial]
D --> T
RT -->|No — agotado| F
F --> AG[Agregar al resultado con flag de error]
Implementación: Worker con reintentos y fallback
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from typing import Callable, Optional
class WorkerConReintentos:
"""Worker robusto con reintentos automáticos y fallback configurable."""
def __init__(
self,
max_reintentos: int = 3,
backoff_base: float = 1.0,
timeout: float = 60.0
):
self.max_reintentos = max_reintentos
self.backoff_base = backoff_base
self.timeout = timeout
async def ejecutar(
self,
prompt: str,
opciones: ClaudeCodeOptions,
fallback: Optional[Callable] = None
) -> str:
ultimo_error = None
for intento in range(self.max_reintentos):
try:
resultado = await asyncio.wait_for(
self._ejecutar_query(prompt, opciones),
timeout=self.timeout
)
return resultado
except asyncio.TimeoutError:
ultimo_error = f"Timeout después de {self.timeout}s"
print(f"[Worker] Intento {intento+1}: timeout")
except Exception as e:
ultimo_error = str(e)
print(f"[Worker] Intento {intento+1}: error — {e}")
# No reintentar ciertos errores fatales
if "invalid_api_key" in str(e).lower():
raise
# Backoff exponencial: 1s, 2s, 4s...
if intento < self.max_reintentos - 1:
espera = self.backoff_base * (2 ** intento)
print(f"[Worker] Esperando {espera}s antes de reintentar...")
await asyncio.sleep(espera)
# Agotados los reintentos
if fallback:
print(f"[Worker] Usando fallback después de {self.max_reintentos} intentos")
return await fallback(ultimo_error)
raise RuntimeError(
f"Worker falló después de {self.max_reintentos} intentos: {ultimo_error}"
)
async def _ejecutar_query(self, prompt: str, opciones: ClaudeCodeOptions) -> str:
resultado = ""
async for mensaje in query(prompt=prompt, options=opciones):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado = bloque.text
return resultado
# Uso con fallback
worker = WorkerConReintentos(max_reintentos=3, timeout=30.0)
async def fallback_simple(error: str) -> str:
return json.dumps({
"error": "Worker no disponible",
"razon": error,
"resultado_parcial": None
})
async def analisis_con_resiliencia(archivo: str) -> str:
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=["Read"],
max_turns=5,
system_prompt="Analiza el archivo y reporta problemas."
)
return await worker.ejecutar(
prompt=f"Analiza {archivo}",
opciones=opciones,
fallback=fallback_simple
)
Circuit breaker para proteger el sistema
from enum import Enum
import time
class EstadoCircuit(Enum):
CERRADO = "cerrado" # Normal — peticiones fluyen
ABIERTO = "abierto" # Circuito roto — bloquea peticiones
SEMI_ABIERTO = "semi_abierto" # Probando si ya funciona
class CircuitBreaker:
"""Evita llamar a un worker que está fallando consistentemente."""
def __init__(
self,
umbral_fallas: int = 5,
tiempo_reset: float = 60.0
):
self.umbral_fallas = umbral_fallas
self.tiempo_reset = tiempo_reset
self.fallas_consecutivas = 0
self.ultimo_fallo = 0.0
self.estado = EstadoCircuit.CERRADO
def registrar_exito(self):
self.fallas_consecutivas = 0
self.estado = EstadoCircuit.CERRADO
def registrar_fallo(self):
self.fallas_consecutivas += 1
self.ultimo_fallo = time.time()
if self.fallas_consecutivas >= self.umbral_fallas:
self.estado = EstadoCircuit.ABIERTO
print(f"[CircuitBreaker] ABIERTO — demasiados fallos")
def puede_pasar(self) -> bool:
if self.estado == EstadoCircuit.CERRADO:
return True
if self.estado == EstadoCircuit.ABIERTO:
# Verificar si ya pasó el tiempo de reset
if time.time() - self.ultimo_fallo > self.tiempo_reset:
self.estado = EstadoCircuit.SEMI_ABIERTO
return True
return False
# SEMI_ABIERTO: dejar pasar una petición de prueba
return True
# Uso global del circuit breaker
cb_worker_analisis = CircuitBreaker(umbral_fallas=3, tiempo_reset=30.0)
async def analisis_con_circuit_breaker(archivo: str) -> Optional[str]:
if not cb_worker_analisis.puede_pasar():
print(f"[CB] Worker bloqueado por circuit breaker — usando fallback")
return None
try:
resultado = await analizar_archivo(archivo)
cb_worker_analisis.registrar_exito()
return resultado
except Exception as e:
cb_worker_analisis.registrar_fallo()
print(f"[CB] Error registrado: {e}")
return None
10. Límites y consideraciones
Profundidad máxima de anidación
El SDK no impone un límite técnico estricto de anidación, pero existen límites prácticos importantes:
graph TD
N0[Nivel 0: Tu aplicación]
N1[Nivel 1: Orquestador principal]
N2[Nivel 2: Manager / Coordinador de área]
N3[Nivel 3: Worker especializado]
N4[Nivel 4: Sub-worker — EVITAR]
STOP[Más niveles — NUNCA]
N0 --> N1 --> N2 --> N3 --> N4 --> STOP
style N4 fill:#ffa500,color:#000
style STOP fill:#ff6b6b,color:#fff
Regla práctica: Máximo 3 niveles de anidación. Cada nivel adicional:
- Multiplica el costo (cada nivel paga tokens de contexto por los niveles superiores)
- Aumenta la latencia total (cada nivel agrega tiempo de API)
- Hace el debugging exponencialmente más difícil
- Aumenta la probabilidad de errores en cascada
Modelo de costos
def estimar_costo_sistema_multiagente(config: dict) -> dict:
"""Estima el costo aproximado de un sistema multi-agente."""
precios = {
"claude-haiku-4-5": {"input": 0.00025, "output": 0.00125},
"claude-opus-4-5": {"input": 0.015, "output": 0.075},
"claude-sonnet-4-5": {"input": 0.003, "output": 0.015},
}
total_usd = 0.0
desglose = []
for worker in config.get("workers", []):
modelo = worker["modelo"]
tokens_input = worker["tokens_input_estimados"]
tokens_output = worker["tokens_output_estimados"]
cantidad = worker.get("cantidad", 1)
precio = precios.get(modelo, precios["claude-haiku-4-5"])
costo_unitario = (
tokens_input / 1000 * precio["input"] +
tokens_output / 1000 * precio["output"]
)
costo_total = costo_unitario * cantidad
desglose.append({
"worker": worker["nombre"],
"modelo": modelo,
"cantidad": cantidad,
"costo_unitario_usd": round(costo_unitario, 6),
"costo_total_usd": round(costo_total, 4),
})
total_usd += costo_total
return {
"total_usd": round(total_usd, 4),
"desglose": desglose,
}
# Ejemplo: estimar costo del fan-out de 50 archivos
config_fanout = {
"workers": [
{
"nombre": "worker_analisis",
"modelo": "claude-haiku-4-5",
"tokens_input_estimados": 3000,
"tokens_output_estimados": 500,
"cantidad": 50, # Un worker por archivo
},
{
"nombre": "worker_reporte",
"modelo": "claude-opus-4-5",
"tokens_input_estimados": 10000,
"tokens_output_estimados": 2000,
"cantidad": 1,
}
]
}
estimacion = estimar_costo_sistema_multiagente(config_fanout)
print(f"Costo estimado: ${estimacion['total_usd']} USD")
for item in estimacion['desglose']:
print(f" {item['worker']} x{item['cantidad']}: ${item['costo_total_usd']}")
Reglas prácticas de límites
| Parámetro | Recomendación | Razón |
|---|---|---|
max_turns | Máximo 20 por worker | Evita bucles infinitos |
| Workers simultáneos | Máximo 10 | Rate limits de API |
| Profundidad de anidación | Máximo 3 niveles | Debugging y costo |
| Timeout por worker | 30-120 segundos | Evita bloqueos |
| Tamaño de batch | 5-10 items | Balance paralelismo/costo |
| Contexto por worker | Bajo 100K tokens | Costo y velocidad |
11. Ejemplo completo: Sistema de investigación multi-agente
Este sistema implementa un investigador que busca, analiza y redacta usando tres workers especializados con manejo de errores completo.
import asyncio
import json
from claude_code_sdk import query, ClaudeCodeOptions
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class Hallazgo:
titulo: str
contenido: str
fuente: str
relevancia: int # 1-10
@dataclass
class Investigacion:
tema: str
hallazgos: List[Hallazgo] = field(default_factory=list)
analisis: str = ""
reporte: str = ""
errores: List[str] = field(default_factory=list)
async def worker_buscador(tema: str, directorio: str) -> List[Hallazgo]:
"""Busca información relevante en el sistema de archivos."""
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=["Glob", "Grep", "Read"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt=f"""Eres un investigador experto en código.
Busca TODA la información disponible sobre: "{tema}"
Revisa archivos markdown, código, configuraciones y documentación.
Retorna un JSON array de hallazgos con este formato exacto:
[
{{
"titulo": "Nombre descriptivo del hallazgo",
"contenido": "Descripción detallada del contenido encontrado",
"fuente": "ruta/al/archivo.ext",
"relevancia": 8
}}
]
Incluye solo hallazgos con relevancia mayor o igual a 5.
Retorna SOLO el JSON array, sin texto adicional.""",
max_turns=15,
cwd=directorio,
)
try:
resultado = ""
async for mensaje in query(
prompt=f"Busca toda la información sobre '{tema}' en este proyecto.",
options=opciones
):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado = bloque.text
# Parsear JSON de hallazgos
inicio = resultado.find('[')
fin = resultado.rfind(']') + 1
if inicio >= 0:
hallazgos_raw = json.loads(resultado[inicio:fin])
return [Hallazgo(**h) for h in hallazgos_raw]
except (json.JSONDecodeError, TypeError, KeyError, Exception) as e:
print(f"[Buscador] Error parseando hallazgos: {e}")
# Fallback: retornar hallazgo genérico
return [Hallazgo(
titulo="Búsqueda general",
contenido=resultado if resultado else "Sin resultados",
fuente="múltiples archivos",
relevancia=5
)]
async def worker_analizador(tema: str, hallazgos: List[Hallazgo]) -> str:
"""Analiza los hallazgos y extrae insights profundos."""
if not hallazgos:
return "No hay hallazgos para analizar."
hallazgos_texto = "\n\n".join([
f"### {h.titulo} (Relevancia: {h.relevancia}/10)\n"
f"Fuente: {h.fuente}\n{h.contenido}"
for h in sorted(hallazgos, key=lambda x: x.relevancia, reverse=True)
])
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=[],
system_prompt="""Eres un analista experto en sistemas de software.
Sintetizas información técnica compleja en insights accionables.
Analiza los hallazgos y produce un análisis estructurado con:
1. Patrones principales identificados
2. Contradicciones o inconsistencias
3. Brechas de información (qué falta saber)
4. Conclusiones clave con nivel de confianza
5. Riesgos identificados
Sé específico, técnico y concreto.""",
max_turns=5,
)
resultado = ""
async for mensaje in query(
prompt=f"Analiza estos hallazgos sobre '{tema}':\n\n{hallazgos_texto}",
options=opciones
):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado = bloque.text
return resultado
async def worker_redactor_investigacion(
tema: str,
hallazgos: List[Hallazgo],
analisis: str
) -> str:
"""Redacta el reporte final de investigación."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=[],
system_prompt="""Eres un redactor técnico senior que produce reportes ejecutivos.
El reporte debe tener:
- Título y fecha
- Resumen ejecutivo (máximo 3 párrafos)
- Metodología breve
- Hallazgos principales (con evidencia citada)
- Análisis e interpretación
- Recomendaciones concretas priorizadas (P0, P1, P2)
- Próximos pasos sugeridos
- Referencias (lista de fuentes)
Usa Markdown con encabezados, listas y tablas donde corresponda.
Sé conciso pero completo.""",
max_turns=5,
)
hallazgos_resumen = "\n".join([
f"- [{h.relevancia}/10] {h.titulo}: {h.fuente}"
for h in hallazgos
])
prompt = f"""Redacta un reporte de investigación completo sobre: "{tema}"
## Hallazgos recolectados:
{hallazgos_resumen}
## Análisis técnico:
{analisis}
"""
resultado = ""
async for mensaje in query(prompt=prompt, options=opciones):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado = bloque.text
return resultado
async def sistema_investigacion(tema: str, directorio: str) -> Investigacion:
"""
Sistema completo de investigación multi-agente.
Patrón: Hub-and-Spoke con dependencias secuenciales.
Resiliencia: cada etapa tiene manejo de errores independiente.
"""
investigacion = Investigacion(tema=tema)
print(f"[Investigación] Iniciando: '{tema}'")
# Paso 1: Búsqueda
try:
print("[Investigación] Buscando hallazgos...")
investigacion.hallazgos = await asyncio.wait_for(
worker_buscador(tema, directorio),
timeout=120.0
)
print(f"[Investigación] Encontrados {len(investigacion.hallazgos)} hallazgos")
except asyncio.TimeoutError:
investigacion.errores.append("Búsqueda: timeout después de 120s")
print("[Investigación] Advertencia: búsqueda agotó el timeout")
if not investigacion.hallazgos:
investigacion.reporte = "# Sin resultados\n\nNo se encontró información sobre el tema."
return investigacion
# Paso 2: Análisis
try:
print("[Investigación] Analizando hallazgos...")
investigacion.analisis = await asyncio.wait_for(
worker_analizador(tema, investigacion.hallazgos),
timeout=60.0
)
except Exception as e:
investigacion.errores.append(f"Análisis: {str(e)}")
investigacion.analisis = "Análisis no disponible por error."
print(f"[Investigación] Advertencia: análisis falló: {e}")
# Paso 3: Redacción
try:
print("[Investigación] Redactando reporte...")
investigacion.reporte = await asyncio.wait_for(
worker_redactor_investigacion(
tema,
investigacion.hallazgos,
investigacion.analisis
),
timeout=60.0
)
except Exception as e:
investigacion.errores.append(f"Redacción: {str(e)}")
# Reporte de emergencia con los datos disponibles
investigacion.reporte = (
f"# Reporte: {tema}\n\n"
f"## Hallazgos\n{hallazgos_resumen}\n\n"
f"## Análisis\n{investigacion.analisis}"
)
print(f"[Investigación] Completado. Errores: {len(investigacion.errores)}")
return investigacion
# Uso
async def main():
resultado = await sistema_investigacion(
tema="patrones de autenticación",
directorio="/home/usuario/mi-proyecto"
)
print(resultado.reporte)
if resultado.errores:
print(f"\nAdvertencias: {resultado.errores}")
asyncio.run(main())
12. Ejemplo completo: Agente de refactoring paralelo
Procesa múltiples archivos en paralelo, con control de concurrencia y reporte detallado.
import asyncio
import json
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
from dataclasses import dataclass
from typing import List
@dataclass
class ResultadoRefactoring:
archivo: str
cambios_realizados: List[str]
mejoras: str
exito: bool
error: str = ""
async def refactorizar_archivo(ruta: str) -> ResultadoRefactoring:
"""Refactoriza un archivo Python individual."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Edit"],
disallowed_tools=["Bash", "Write", "Glob", "Grep"],
system_prompt="""Eres un experto en refactoring de Python.
Aplica SOLO estas mejoras cuando sea necesario:
1. Extrae funciones largas (>30 líneas) en funciones más pequeñas
2. Elimina código duplicado evidente
3. Mejora nombres de variables poco descriptivos (x, tmp, d, etc.)
4. Agrega type hints donde falten en funciones
5. Simplifica lógica booleana compleja
REGLAS CRÍTICAS:
- NO cambies la funcionalidad bajo ninguna circunstancia
- NO modifiques tests existentes
- Haz cambios conservadores y seguros
- Si el archivo ya está bien, NO toques nada
Al finalizar, retorna SOLO este JSON:
{"cambios": ["descripcion del cambio 1", "descripcion 2"], "mejoras": "resumen en una oración"}
Si no hiciste cambios: {"cambios": [], "mejoras": "Archivo ya cumple los estándares"}""",
max_turns=10,
)
try:
resultado = ""
async for mensaje in query(
prompt=f"Refactoriza este archivo: {ruta}",
options=opciones
):
if hasattr(mensaje, 'content'):
for bloque in mensaje.content:
if hasattr(bloque, 'text'):
resultado = bloque.text
# Extraer JSON de cambios del final del resultado
cambios = []
mejoras = ""
try:
inicio = resultado.rfind('{"cambios"')
if inicio >= 0:
json_str = resultado[inicio:resultado.rfind('}')+1]
datos = json.loads(json_str)
cambios = datos.get("cambios", [])
mejoras = datos.get("mejoras", "")
except (json.JSONDecodeError, ValueError):
# Si no hay JSON, el resultado mismo es la descripción
mejoras = resultado[-300:] if len(resultado) > 300 else resultado
return ResultadoRefactoring(
archivo=ruta,
cambios_realizados=cambios,
mejoras=mejoras,
exito=True
)
except Exception as e:
return ResultadoRefactoring(
archivo=ruta,
cambios_realizados=[],
mejoras="",
exito=False,
error=str(e)
)
async def refactoring_paralelo(
directorio: str,
patron: str = "**/*.py",
max_concurrente: int = 4,
excluir: List[str] = None
) -> dict:
"""Refactoriza todos los archivos Python en paralelo con control de concurrencia."""
excluir = excluir or ["__pycache__", "test_", "migrations", ".venv"]
archivos = [
str(p) for p in Path(directorio).glob(patron)
if not any(excl in str(p) for excl in excluir)
]
print(f"[Refactoring] Procesando {len(archivos)} archivos, {max_concurrente} concurrentes")
semaforo = asyncio.Semaphore(max_concurrente)
async def procesar_con_limite(archivo: str) -> ResultadoRefactoring:
async with semaforo:
nombre = Path(archivo).name
print(f"[Refactoring] Iniciando: {nombre}")
result = await refactorizar_archivo(archivo)
estado = "OK" if result.exito else "ERROR"
cambios = len(result.cambios_realizados)
print(f"[Refactoring] {estado} {nombre} ({cambios} cambios)")
return result
tareas = [procesar_con_limite(arch) for arch in archivos]
resultados = await asyncio.gather(*tareas)
exitosos = [r for r in resultados if r.exito]
fallidos = [r for r in resultados if not r.exito]
modificados = [r for r in exitosos if r.cambios_realizados]
total_cambios = sum(len(r.cambios_realizados) for r in exitosos)
return {
"total_archivos": len(archivos),
"exitosos": len(exitosos),
"fallidos": len(fallidos),
"archivos_modificados": len(modificados),
"total_cambios_aplicados": total_cambios,
"archivos_sin_cambios": len(exitosos) - len(modificados),
"cambios_por_archivo": [
{"archivo": r.archivo, "cambios": r.cambios_realizados}
for r in modificados
],
"errores": [
{"archivo": r.archivo, "error": r.error}
for r in fallidos
],
}
async def main():
stats = await refactoring_paralelo(
directorio="/mi/proyecto/src",
max_concurrente=3
)
print(f"\n=== Resultado del Refactoring ===")
print(f"Archivos procesados: {stats['exitosos']}/{stats['total_archivos']}")
print(f"Archivos modificados: {stats['archivos_modificados']}")
print(f"Total cambios: {stats['total_cambios_aplicados']}")
if stats['fallidos'] > 0:
print(f"Errores: {stats['fallidos']}")
for err in stats['errores']:
print(f" - {err['archivo']}: {err['error']}")
asyncio.run(main())
13. Ejemplo completo: Pipeline de CI/CD
Un sistema multi-agente que coordina el pipeline completo: análisis → tests → reporte.
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict
class EstadoEtapa(Enum):
PENDIENTE = "pendiente"
EN_PROGRESO = "en_progreso"
EXITOSO = "exitoso"
FALLIDO = "fallido"
OMITIDO = "omitido"
@dataclass
class EtapaCICD:
nombre: str
descripcion: str
estado: EstadoEtapa = EstadoEtapa.PENDIENTE
resultado: str = ""
duracion_segundos: float = 0.0
error: str = ""
es_critica: bool = True # Si falla, detiene el pipeline
async def ejecutar_analisis_estatico(directorio: str) -> EtapaCICD:
"""Análisis estático: linting, type checking, format."""
import time
etapa = EtapaCICD(
nombre="analisis_estatico",
descripcion="Análisis estático de código",
es_critica=False # Advertencias no detienen el pipeline
)
etapa.estado = EstadoEtapa.EN_PROGRESO
inicio = time.time()
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=["Bash", "Glob", "Read"],
disallowed_tools=["Write", "Edit"],
system_prompt="""Ejecuta análisis estático del código Python.
Verifica:
1. Sintaxis correcta (python -m py_compile)
2. Imports innecesarios o faltantes
3. Variables no utilizadas
4. Funciones demasiado largas (>50 líneas)
Retorna JSON: {"aprobado": true/false, "advertencias": [], "errores": []}
Sé estricto pero justo.""",
max_turns=8,
cwd=directorio,
)
try:
resultado = ""
async for m in query(
prompt="Ejecuta análisis estático completo del proyecto Python.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
etapa.resultado = resultado
etapa.estado = EstadoEtapa.EXITOSO
except Exception as e:
etapa.error = str(e)
etapa.estado = EstadoEtapa.FALLIDO
etapa.duracion_segundos = time.time() - inicio
return etapa
async def ejecutar_tests(directorio: str) -> EtapaCICD:
"""Ejecuta la suite de tests con pytest."""
import time
etapa = EtapaCICD(
nombre="tests",
descripcion="Suite de tests unitarios e integración",
es_critica=True # Tests fallidos detienen el pipeline
)
etapa.estado = EstadoEtapa.EN_PROGRESO
inicio = time.time()
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=["Bash"],
system_prompt="""Ejecuta los tests del proyecto.
Detecta automáticamente el framework (pytest, unittest, jest, etc.).
Ejecuta los tests y reporta resultados.
Retorna JSON:
{
"aprobado": true/false,
"tests_pasados": N,
"tests_fallidos": N,
"cobertura": "X%",
"tests_fallidos_detalle": ["test_name: error message"]
}""",
max_turns=5,
cwd=directorio,
)
try:
resultado = ""
async for m in query(
prompt="Ejecuta todos los tests y reporta los resultados en JSON.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
etapa.resultado = resultado
etapa.estado = EstadoEtapa.EXITOSO
except Exception as e:
etapa.error = str(e)
etapa.estado = EstadoEtapa.FALLIDO
etapa.duracion_segundos = time.time() - inicio
return etapa
async def ejecutar_revision_seguridad(directorio: str) -> EtapaCICD:
"""Revisión básica de seguridad."""
import time
etapa = EtapaCICD(
nombre="seguridad",
descripcion="Revisión de vulnerabilidades de seguridad",
es_critica=False
)
etapa.estado = EstadoEtapa.EN_PROGRESO
inicio = time.time()
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=["Grep", "Read", "Glob"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Realiza una revisión básica de seguridad.
Busca:
- Secrets hardcodeados (passwords, API keys, tokens)
- SQL injection potencial
- Uso de eval() o exec()
- Dependencias con vulnerabilidades conocidas
Retorna JSON: {"aprobado": true/false, "problemas_criticos": [], "advertencias": []}""",
max_turns=8,
cwd=directorio,
)
try:
resultado = ""
async for m in query(
prompt="Realiza revisión de seguridad del código fuente.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
etapa.resultado = resultado
etapa.estado = EstadoEtapa.EXITOSO
except Exception as e:
etapa.error = str(e)
etapa.estado = EstadoEtapa.FALLIDO
etapa.duracion_segundos = time.time() - inicio
return etapa
async def generar_reporte_pipeline(etapas: List[EtapaCICD], proyecto: str) -> str:
"""Genera reporte final del pipeline."""
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=[],
system_prompt="""Genera un reporte de CI/CD conciso en Markdown.
Incluye: estado general (PASS/FAIL), tabla de etapas, problemas críticos, recomendaciones.""",
max_turns=3,
)
resumen_etapas = "\n".join([
f"- {e.nombre}: {e.estado.value} ({e.duracion_segundos:.1f}s)\n {e.resultado[:200] if e.resultado else e.error}"
for e in etapas
])
resultado = ""
async for m in query(
prompt=f"Genera reporte de CI/CD para '{proyecto}':\n\n{resumen_etapas}",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return resultado
async def pipeline_cicd_completo(directorio: str, proyecto: str = "mi-proyecto") -> dict:
"""
Pipeline CI/CD multi-agente completo.
Análisis y seguridad en paralelo, luego tests, luego reporte.
"""
import time
print(f"[CI/CD] Iniciando pipeline para: {proyecto}")
inicio_total = time.time()
etapas: List[EtapaCICD] = []
# FASE 1: Análisis y seguridad en paralelo (no bloquean el build)
print("[CI/CD] Fase 1: Análisis paralelo...")
analisis, seguridad = await asyncio.gather(
ejecutar_analisis_estatico(directorio),
ejecutar_revision_seguridad(directorio)
)
etapas.extend([analisis, seguridad])
# FASE 2: Tests (críticos — si fallan, se reporta pero no bloquea)
print("[CI/CD] Fase 2: Tests...")
tests = await ejecutar_tests(directorio)
etapas.append(tests)
# FASE 3: Reporte final
print("[CI/CD] Fase 3: Generando reporte...")
reporte = await generar_reporte_pipeline(etapas, proyecto)
duracion_total = time.time() - inicio_total
estado_global = all(
e.estado == EstadoEtapa.EXITOSO
for e in etapas
if e.es_critica
)
print(f"[CI/CD] Pipeline completado en {duracion_total:.1f}s — {'PASS' if estado_global else 'FAIL'}")
return {
"estado": "PASS" if estado_global else "FAIL",
"duracion_total": round(duracion_total, 1),
"etapas": [
{
"nombre": e.nombre,
"estado": e.estado.value,
"duracion": round(e.duracion_segundos, 1),
"critica": e.es_critica
}
for e in etapas
],
"reporte": reporte,
}
asyncio.run(pipeline_cicd_completo("/mi/proyecto", "mi-api"))
14. Testing de sistemas multi-agente
Mocking de subagentes en Python
import pytest
import asyncio
from unittest.mock import AsyncMock, patch, MagicMock
def crear_mock_query(respuesta: str):
"""Crea un mock del generador async de query()."""
class MensajeMock:
def __init__(self, texto):
self.content = [MagicMock(text=texto)]
async def _generador(*args, **kwargs):
yield MensajeMock(respuesta)
return _generador
@pytest.mark.asyncio
async def test_worker_buscador_parsea_json_correctamente():
"""Verifica que el worker parsea JSON de hallazgos."""
respuesta_mock = json.dumps([{
"titulo": "Configuración de auth",
"contenido": "Usa JWT con RS256",
"fuente": "auth/config.py",
"relevancia": 9
}])
with patch('claude_code_sdk.query', crear_mock_query(respuesta_mock)):
hallazgos = await worker_buscador("autenticación", "/proyecto")
assert len(hallazgos) == 1
assert hallazgos[0].titulo == "Configuración de auth"
assert hallazgos[0].relevancia == 9
@pytest.mark.asyncio
async def test_worker_buscador_maneja_json_invalido():
"""Verifica fallback cuando el JSON está malformado."""
with patch('claude_code_sdk.query', crear_mock_query("No hay JSON aquí, solo texto")):
hallazgos = await worker_buscador("tema", "/proyecto")
# Debe retornar al menos un hallazgo fallback
assert len(hallazgos) >= 1
@pytest.mark.asyncio
async def test_coordinador_continua_si_analisis_falla():
"""Verifica que el sistema produce reporte aunque el análisis falle."""
call_count = 0
def query_que_falla_en_analisis(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 2: # Segunda llamada = análisis
async def _gen():
raise Exception("Error de análisis simulado")
yield # Hace al generador válido
return _gen()
else:
return crear_mock_query("Resultado genérico")(*args, **kwargs)
with patch('claude_code_sdk.query', query_que_falla_en_analisis):
investigacion = await sistema_investigacion("tema", "/proyecto")
assert investigacion.reporte != "" # Debe tener reporte aunque sea de fallback
assert len(investigacion.errores) > 0 # Debe registrar el error
@pytest.mark.asyncio
async def test_fan_out_respeta_limite_concurrencia():
"""Verifica que el fan-out no supera la concurrencia máxima."""
concurrentes_actuales = 0
max_concurrentes = 0
original_analizar = analizar_archivo
async def analizar_con_contador(ruta: str):
nonlocal concurrentes_actuales, max_concurrentes
concurrentes_actuales += 1
max_concurrentes = max(max_concurrentes, concurrentes_actuales)
await asyncio.sleep(0.01) # Simular trabajo
concurrentes_actuales -= 1
return {"archivo": ruta, "problemas": [], "puntuacion": 8, "resumen": "OK"}
archivos = [f"archivo_{i}.py" for i in range(10)]
with patch('__main__.analizar_archivo', analizar_con_contador):
await fan_out_controlado(archivos, analizar_con_contador, max_concurrente=3)
assert max_concurrentes <= 3, f"Se excedió la concurrencia: {max_concurrentes}"
TypeScript: Testing con Jest y mocks
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
jest.mock("@anthropic-ai/claude-code-sdk");
const mockQuery = query as jest.MockedFunction<typeof query>;
function crearMockQuery(respuesta: string) {
return async function* () {
yield {
type: "assistant" as const,
message: {
content: [{ type: "text" as const, text: respuesta }],
},
};
};
}
describe("Sistema multi-agente", () => {
beforeEach(() => jest.clearAllMocks());
describe("workerInvestigacion", () => {
it("retorna resultado exitoso cuando el query funciona", async () => {
mockQuery.mockImplementation(crearMockQuery("Estructura del proyecto") as any);
const resultado = await workerInvestigacion("/proyecto");
expect(resultado.success).toBe(true);
expect(resultado.result).toBe("Estructura del proyecto");
expect(mockQuery).toHaveBeenCalledTimes(1);
});
it("retorna error cuando el query falla", async () => {
mockQuery.mockImplementation(async function* () {
throw new Error("API Error");
} as any);
const resultado = await workerInvestigacion("/proyecto");
expect(resultado.success).toBe(false);
expect(resultado.error).toContain("API Error");
});
it("usa las opciones correctas para el modelo liviano", async () => {
mockQuery.mockImplementation(crearMockQuery("resultado") as any);
await workerInvestigacion("/proyecto");
const llamadaOpciones = (mockQuery.mock.calls[0][0] as any).options;
expect(llamadaOpciones.model).toBe("claude-haiku-4-5");
expect(llamadaOpciones.disallowedTools).toContain("Bash");
expect(llamadaOpciones.disallowedTools).toContain("Write");
});
});
describe("coordinadorHub", () => {
it("completa el flujo completo cuando todos los workers funcionan", async () => {
let llamadas = 0;
mockQuery.mockImplementation(async function* () {
llamadas++;
yield {
type: "assistant" as const,
message: { content: [{ type: "text" as const, text: `Resultado ${llamadas}` }] },
};
} as any);
const resultado = await coordinadorHub("/proyecto");
expect(resultado).toBeTruthy();
expect(llamadas).toBe(3); // investigación + análisis + redacción
});
});
});
15. Anti-patrones a evitar
Anti-patrón 1: Demasiados niveles de anidación
# MAL: 4 niveles — imposible de depurar y muy caro
async def orquestador_nivel_1():
async def orquestador_nivel_2():
async def orquestador_nivel_3():
async def worker_nivel_4():
pass # El costo se multiplica exponencialmente
# BIEN: máximo 2-3 niveles, workers directos desde el coordinador
async def orquestador():
resultados = await asyncio.gather(
worker_a(),
worker_b(),
worker_c()
)
Anti-patrón 2: Workers sin límite de herramientas
# MAL: worker con acceso total al sistema sin justificación
opciones_inseguras = ClaudeCodeOptions(
# Sin restricciones de herramientas
max_turns=100, # Sin límite práctico — puede ejecutarse por horas
)
# BIEN: principio de mínimo privilegio
opciones_seguras = ClaudeCodeOptions(
allowed_tools=["Read", "Glob"], # Solo lo estrictamente necesario
disallowed_tools=["Bash", "Write", "Edit"], # Bloquear explícitamente lo peligroso
max_turns=10, # Límite razonable para la tarea
)
Anti-patrón 3: Ignorar errores de workers
# MAL: falla silenciosa — produce resultados incorrectos sin saberlo
async def orquestador_malo():
resultado_1 = await worker_1()
resultado_2 = await worker_2()
return consolidar(resultado_1, resultado_2)
# Si cualquier worker retorna None o string vacío, consolidar falla silenciosamente
# BIEN: manejo explícito con decisiones claras
async def orquestador_bueno():
resultado_1 = await worker_1()
if not resultado_1.exito:
if resultado_1.es_critico:
raise RuntimeError(f"Worker crítico falló: {resultado_1.error}")
else:
print(f"Advertencia: worker no crítico falló: {resultado_1.error}")
resultado_1.valor = "No disponible"
resultado_2 = await worker_2()
return consolidar(resultado_1.valor, resultado_2.valor if resultado_2.exito else "N/A")
Anti-patrón 4: System prompt demasiado genérico
# MAL: system prompt genérico que no especializa al worker
opciones_genericas = ClaudeCodeOptions(
system_prompt="Eres un asistente de código útil."
# El worker no sabe qué hacer, en qué formato retornar, ni qué NO hacer
)
# BIEN: system prompt específico con formato de output, rol claro y restricciones
opciones_especializadas = ClaudeCodeOptions(
system_prompt="""Eres un especialista en análisis de seguridad OWASP.
Tu ÚNICO objetivo: detectar vulnerabilidades OWASP Top 10 en código Python.
Formato de output OBLIGATORIO:
{"vulnerabilidades": [{"tipo": "A01|A02|...", "archivo": "...", "linea": N, "descripcion": "...", "severidad": "alta|media|baja"}]}
Si no hay vulnerabilidades: {"vulnerabilidades": []}
NO hagas recomendaciones fuera del JSON.
NO modifiques ningún archivo.
SOLO lee y analiza."""
)
Anti-patrón 5: Deadlocks por dependencias circulares
graph LR
A[Worker A espera resultado de B] -->|bloquea| B[Worker B espera resultado de A]
B -->|bloquea| A
style A fill:#ff6b6b,color:#fff
style B fill:#ff6b6b,color:#fff
# MAL: dependencias circulares — deadlock garantizado
async def worker_a(estado):
resultado_b = await esperar_resultado(estado, "worker_b") # Espera B
return procesar(resultado_b)
async def worker_b(estado):
resultado_a = await esperar_resultado(estado, "worker_a") # Espera A — DEADLOCK
return procesar(resultado_a)
# BIEN: dependencias unidireccionales
async def coordinador_sin_deadlock():
resultado_a = await worker_a() # A ejecuta primero
resultado_b = await worker_b(resultado_a) # B usa resultado de A
return combinar(resultado_a, resultado_b)
Anti-patrón 6: No controlar la concurrencia
# MAL: lanza todos los workers a la vez sin límite
archivos = obtener_1000_archivos()
tareas = [analizar_archivo(f) for f in archivos]
resultados = await asyncio.gather(*tareas)
# Lanza 1000 llamadas a la API simultáneamente — rate limit inmediato
# BIEN: controlar concurrencia con semáforo
semaforo = asyncio.Semaphore(5) # Máximo 5 concurrentes
async def analizar_con_limite(archivo):
async with semaforo:
return await analizar_archivo(archivo)
tareas = [analizar_con_limite(f) for f in archivos]
resultados = await asyncio.gather(*tareas)
Anti-patrón 7: Pasar contexto innecesario a los workers
# MAL: pasar todo el código fuente a un worker que solo necesita nombres de archivo
await worker_analisis(
contexto=todo_el_codigo_fuente_del_proyecto, # 100K tokens innecesarios
tarea="encuentra archivos Python"
)
# BIEN: pasar solo el contexto necesario para la tarea
archivos = await worker_explorador() # Primero obtener la lista
await worker_analisis(
contexto=archivos, # Solo la lista de rutas, no el contenido
tarea="analiza estos archivos"
)
Resumen del capítulo
Los sistemas multi-agente permiten escalar la capacidad de resolución de problemas al dividir trabajo complejo entre agentes especializados.
mindmap
root((Subagentes y Orquestación))
Patrones
Hub-and-Spoke
Workers especializados
Síntesis central
Pipeline
Transformación secuencial
Cada etapa transforma la anterior
Fan-out Fan-in
Máximo paralelismo
Consolidación de resultados
Jerárquico
Múltiples niveles de coordinación
Configuración
allowed_tools mínimo privilegio
max_turns siempre definir
system_prompt específico y con formato
model correcto para cada tarea
Comunicación
Estado compartido en memoria
Output como input
Sin comunicación directa entre workers
Resiliencia
Reintentos con backoff exponencial
Circuit breaker
Fallbacks para errores no críticos
Timeouts en todas las llamadas
Testing
Mock de la función query
Tests unitarios por worker
Tests de integración del orquestador
Verificar manejo de errores
Anti-patrones
Más de 3 niveles de anidación
Sin límites de herramientas
Ignorar errores de workers
Deadlocks circulares
Sin control de concurrencia
Próximo capítulo: Manejo de sesiones y contexto persistente entre múltiples queries.