Capítulo 7: Subagentes y Orquestación

Por: Artiko
claudesdksubagentesorquestacionmulti-agentepythontypescript

Capítulo 7: Subagentes y Orquestación

Los sistemas de un solo agente tienen límites naturales: un contexto finito, una sola línea de ejecución, y dificultad para especializar comportamiento. Los subagentes permiten dividir problemas complejos en tareas manejables, ejecutarlas en paralelo o en secuencia, y combinar resultados de manera confiable.


1. ¿Qué son los subagentes?

Un subagente es una instancia del SDK lanzada por otro agente (el orquestador o coordinador). El orquestador no ejecuta directamente las tareas — las delega a workers especializados y sintetiza sus resultados.

Jerarquía de agentes

graph TD
    U[Usuario / Aplicación]
    O[Orquestador]
    W1[Worker: Investigación]
    W2[Worker: Análisis]
    W3[Worker: Redacción]
    S1[Subworker A]
    S2[Subworker B]

    U -->|query principal| O
    O -->|delega tarea 1| W1
    O -->|delega tarea 2| W2
    O -->|delega tarea 3| W3
    W2 -->|subtarea| S1
    W2 -->|subtarea| S2
    W1 -->|resultado| O
    W2 -->|resultado| O
    W3 -->|resultado| O
    O -->|respuesta final| U

La herramienta Task vs lanzar un subagente manualmente

El SDK expone dos mecanismos para delegar trabajo:

MecanismoDescripciónCuándo usar
Task tool (interna)Claude puede invocar subagentes a través de la herramienta Task cuando el modelo lo decideCuando quieres que el modelo decida cuándo delegar
Subagente programáticoTu código lanza explícitamente un nuevo query() con su propio ClaudeCodeOptionsCuando tú controlas cuándo y cómo delegar

En producción, el patrón más robusto es el subagente programático: tu código orquesta explícitamente las llamadas, tienes control total sobre errores, reintentos y paralelismo.

Diferencia conceptual: Agente vs Sistema multi-agente

graph LR
    subgraph "Agente simple"
        A1[query] --> A2[herramientas] --> A3[resultado]
    end

    subgraph "Sistema multi-agente"
        B1[orquestador] --> B2[worker 1]
        B1 --> B3[worker 2]
        B1 --> B4[worker 3]
        B2 --> B5[resultado parcial 1]
        B3 --> B6[resultado parcial 2]
        B4 --> B7[resultado parcial 3]
        B5 --> B8[síntesis]
        B6 --> B8
        B7 --> B8
    end

Ventajas de los sistemas multi-agente

Los sistemas multi-agente no son simplemente “más agentes” — resuelven problemas cualitativos que un agente único no puede:

Especialización: Un worker de análisis de seguridad tiene un system prompt y herramientas completamente distintas a uno de redacción. Cada uno puede ser optimizado para su tarea específica.

Paralelismo: Tareas independientes se ejecutan simultáneamente, reduciendo la latencia total del sistema. Analizar 50 archivos en paralelo es 10x más rápido que secuencialmente.

Aislamiento de errores: Si un worker falla, los demás continúan. El orquestador decide si el error es fatal o si puede continuar con resultados parciales.

Mantenibilidad: Cada worker es un componente independiente, testeable en aislamiento. Cambiar la lógica de un worker no afecta a los demás.

Control de contexto: Cada agente tiene su propio contexto. El orquestador decide qué información comparte con cada worker, evitando que el contexto global se llene con información irrelevante.


2. Configuración de subagentes: ClaudeCodeOptions

Cada subagente es una llamada a query() con su propio conjunto de opciones. Puedes controlar herramientas disponibles, modelo, system prompt, y más.

Campos clave de ClaudeCodeOptions

# Python
from claude_code_sdk import query, ClaudeCodeOptions

opciones_worker = ClaudeCodeOptions(
    # Modelo a usar para este subagente
    model="claude-opus-4-5",

    # Herramientas disponibles SOLO para este agente
    allowed_tools=["Read", "Glob", "Grep"],

    # Herramientas completamente bloqueadas
    disallowed_tools=["Bash", "Write", "Edit"],

    # System prompt especializado
    system_prompt="""Eres un agente de análisis de seguridad.
    Solo puedes LEER archivos. Nunca modifiques nada.
    Tu output debe ser JSON con campos: vulnerabilidades, severidad, recomendaciones.""",

    # Máximo número de turnos (límite de seguridad)
    max_turns=10,

    # Directorio de trabajo
    cwd="/ruta/al/proyecto",

    # Variables de entorno disponibles
    allowed_env_vars=["PATH", "NODE_ENV"],
)
// TypeScript
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const opcionesWorker: ClaudeCodeOptions = {
  model: "claude-opus-4-5",
  allowedTools: ["Read", "Glob", "Grep"],
  disallowedTools: ["Bash", "Write", "Edit"],
  systemPrompt: `Eres un agente de análisis de seguridad.
    Solo puedes LEER archivos. Nunca modifiques nada.
    Tu output debe ser JSON con campos: vulnerabilidades, severidad, recomendaciones.`,
  maxTurns: 10,
  cwd: "/ruta/al/proyecto",
};

Campos importantes explicados

allowed_tools: Lista blanca de herramientas. Si está definida, el agente SOLO puede usar esas herramientas. Útil para workers especializados que no deben tener acceso completo.

disallowed_tools: Lista negra. Bloquea herramientas específicas sin importar qué más esté disponible. Útil para agregar restricciones a un conjunto base.

system_prompt: Define la personalidad, rol y restricciones del agente. Un buen system prompt para un subagente debe ser:

max_turns: Número máximo de vueltas de conversación. Fundamental para evitar agentes que se queden en bucles infinitos. En producción, siempre definir este valor.

cwd: Directorio de trabajo. Cada worker puede operar en un directorio diferente si es necesario.

Estrategia de selección de modelo por worker

No todos los workers necesitan el modelo más potente. Usar el modelo correcto para cada tarea optimiza costos y velocidad:

from claude_code_sdk import query, ClaudeCodeOptions

# Worker de exploración: tarea simple, modelo liviano
opciones_explorer = ClaudeCodeOptions(
    model="claude-haiku-4-5",  # Más rápido y barato
    allowed_tools=["Glob", "Grep"],
    max_turns=5,
    system_prompt="Lista archivos relevantes. Retorna JSON array de rutas."
)

# Worker de análisis: razonamiento complejo, modelo potente
opciones_analyzer = ClaudeCodeOptions(
    model="claude-opus-4-5",  # Más capaz para análisis profundo
    allowed_tools=["Read", "Grep"],
    max_turns=15,
    system_prompt="Analiza la arquitectura del código. Detecta problemas de diseño."
)

# Worker de formato: tarea mecánica, modelo liviano
opciones_formatter = ClaudeCodeOptions(
    model="claude-haiku-4-5",
    allowed_tools=[],  # Sin herramientas — solo transforma texto
    max_turns=3,
    system_prompt="Formatea el análisis como Markdown estructurado."
)
// TypeScript equivalente
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const opcionesExplorer: ClaudeCodeOptions = {
  model: "claude-haiku-4-5",
  allowedTools: ["Glob", "Grep"],
  maxTurns: 5,
  systemPrompt: "Lista archivos relevantes. Retorna JSON array de rutas.",
};

const opcionesAnalyzer: ClaudeCodeOptions = {
  model: "claude-opus-4-5",
  allowedTools: ["Read", "Grep"],
  maxTurns: 15,
  systemPrompt: "Analiza la arquitectura del código. Detecta problemas de diseño.",
};

const opcionesFormatter: ClaudeCodeOptions = {
  model: "claude-haiku-4-5",
  allowedTools: [],
  maxTurns: 3,
  systemPrompt: "Formatea el análisis como Markdown estructurado.",
};

3. Patrones de orquestación

Existen cuatro patrones principales para organizar sistemas multi-agente:

graph TD
    subgraph "Hub-and-Spoke"
        H[Hub/Coordinador]
        H --> S1[Spoke 1]
        H --> S2[Spoke 2]
        H --> S3[Spoke 3]
    end

    subgraph "Pipeline"
        P1[Etapa 1] --> P2[Etapa 2] --> P3[Etapa 3]
    end

    subgraph "Fan-out/Fan-in"
        FO[Fan-out]
        FO --> FW1[Worker 1]
        FO --> FW2[Worker 2]
        FO --> FW3[Worker 3]
        FW1 --> FI[Fan-in/Aggregator]
        FW2 --> FI
        FW3 --> FI
    end

    subgraph "Jerárquico"
        JO[Orquestador]
        JO --> JM1[Manager 1]
        JO --> JM2[Manager 2]
        JM1 --> JW1[Worker 1A]
        JM1 --> JW2[Worker 1B]
        JM2 --> JW3[Worker 2A]
    end

Comparación de patrones

PatrónParalelismoComplejidadMejor para
Hub-and-SpokeOpcionalMediaWorkers especializados, tareas independientes
PipelineNo (secuencial)BajaTransformaciones encadenadas
Fan-out/Fan-inMáximoAltaAnálisis de múltiples archivos, scraping
JerárquicoParcialMuy AltaProyectos grandes con sub-equipos

Cuándo usar cada patrón

Hub-and-Spoke: Cuando tienes tareas claramente diferenciadas por dominio (búsqueda, análisis, redacción) y el coordinador necesita sintetizar perspectivas diferentes.

Pipeline: Cuando cada paso transforma el output del anterior. Ideal para ETL, procesamiento de datos, o generación de contenido en etapas.

Fan-out/Fan-in: Cuando tienes un conjunto de items del mismo tipo (archivos, URLs, registros) que deben procesarse individualmente y luego consolidarse.

Jerárquico: Para proyectos muy grandes donde diferentes “equipos” de agentes trabajan en dominios separados con sus propios coordinadores.


4. Patrón Hub-and-Spoke

El coordinador (hub) recibe la tarea principal, la descompone, y delega fragmentos a workers especializados (spokes). Cada spoke tiene herramientas y prompts optimizados para su función.

sequenceDiagram
    participant App as Aplicación
    participant Hub as Coordinador
    participant RW as Worker Researcher
    participant AW as Worker Analyst
    participant WW as Worker Writer

    App->>Hub: "Análisis completo del proyecto X"
    Hub->>RW: "Encuentra todos los archivos de configuración"
    Hub->>AW: "Analiza la arquitectura del código fuente"
    RW-->>Hub: Lista de configs encontradas
    AW-->>Hub: Análisis de arquitectura
    Hub->>WW: "Genera reporte con estos hallazgos"
    WW-->>Hub: Reporte en Markdown
    Hub-->>App: Reporte final consolidado

Implementación completa en Python

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from dataclasses import dataclass
from typing import Optional

@dataclass
class ResultadoWorker:
    nombre: str
    resultado: str
    exito: bool
    error: Optional[str] = None

async def worker_investigacion(directorio: str) -> ResultadoWorker:
    """Worker especializado en explorar la estructura del proyecto."""
    opciones = ClaudeCodeOptions(
        model="claude-haiku-4-5",  # Modelo liviano para exploración
        allowed_tools=["Read", "Glob", "Grep"],
        disallowed_tools=["Bash", "Write", "Edit"],
        system_prompt="""Eres un agente de exploración de código.
Tu tarea es catalogar la estructura del proyecto.
Retorna SOLO un JSON con este formato exacto:
{
  "archivos_principales": ["ruta1", "ruta2"],
  "lenguajes": ["Python", "TypeScript"],
  "frameworks": ["FastAPI", "React"],
  "configuraciones": ["package.json", "pyproject.toml"],
  "tests": "sí/no",
  "documentacion": "sí/no"
}""",
        max_turns=8,
        cwd=directorio,
    )

    try:
        resultado_texto = ""
        async for mensaje in query(
            prompt=f"Explora el proyecto en {directorio} y retorna el JSON de estructura.",
            options=opciones
        ):
            if hasattr(mensaje, 'content'):
                for bloque in mensaje.content:
                    if hasattr(bloque, 'text'):
                        resultado_texto = bloque.text

        return ResultadoWorker(
            nombre="investigacion",
            resultado=resultado_texto,
            exito=True
        )
    except Exception as e:
        return ResultadoWorker(
            nombre="investigacion",
            resultado="",
            exito=False,
            error=str(e)
        )

async def worker_analisis(directorio: str, contexto: str) -> ResultadoWorker:
    """Worker especializado en análisis de calidad de código."""
    opciones = ClaudeCodeOptions(
        model="claude-opus-4-5",  # Modelo más potente para análisis
        allowed_tools=["Read", "Glob", "Grep"],
        disallowed_tools=["Bash", "Write", "Edit"],
        system_prompt="""Eres un experto en análisis de calidad de código.
Analiza el código fuente y detecta:
- Problemas de arquitectura
- Code smells
- Deuda técnica
- Violaciones de SOLID
- Riesgos de seguridad obvios

Retorna un análisis estructurado en Markdown con secciones claras.""",
        max_turns=15,
        cwd=directorio,
    )

    try:
        resultado_texto = ""
        async for mensaje in query(
            prompt=f"Basado en esta estructura: {contexto}\n\nAnaliza la calidad del código.",
            options=opciones
        ):
            if hasattr(mensaje, 'content'):
                for bloque in mensaje.content:
                    if hasattr(bloque, 'text'):
                        resultado_texto = bloque.text

        return ResultadoWorker(
            nombre="analisis",
            resultado=resultado_texto,
            exito=True
        )
    except Exception as e:
        return ResultadoWorker(
            nombre="analisis",
            resultado="",
            exito=False,
            error=str(e)
        )

async def worker_redactor(hallazgos: dict) -> ResultadoWorker:
    """Worker especializado en generar reportes ejecutivos."""
    opciones = ClaudeCodeOptions(
        model="claude-opus-4-5",
        allowed_tools=[],  # Sin herramientas — solo genera texto
        system_prompt="""Eres un experto en comunicación técnica.
Conviertes análisis técnicos en reportes claros y accionables.
El reporte debe tener:
1. Resumen ejecutivo (3-5 oraciones)
2. Hallazgos principales (lista priorizada)
3. Recomendaciones inmediatas
4. Recomendaciones a largo plazo
5. Métricas de salud del proyecto

Usa Markdown con encabezados, listas y tablas donde corresponda.""",
        max_turns=5,
    )

    prompt = f"""Genera un reporte ejecutivo basado en estos hallazgos:

## Estructura del Proyecto
{hallazgos.get('investigacion', 'No disponible')}

## Análisis de Calidad
{hallazgos.get('analisis', 'No disponible')}
"""

    try:
        resultado_texto = ""
        async for mensaje in query(prompt=prompt, options=opciones):
            if hasattr(mensaje, 'content'):
                for bloque in mensaje.content:
                    if hasattr(bloque, 'text'):
                        resultado_texto = bloque.text

        return ResultadoWorker(
            nombre="redactor",
            resultado=resultado_texto,
            exito=True
        )
    except Exception as e:
        return ResultadoWorker(
            nombre="redactor",
            resultado="",
            exito=False,
            error=str(e)
        )

async def coordinador_hub(directorio: str) -> str:
    """
    Coordinador principal que orquesta todos los workers.
    Patrón: Hub-and-Spoke con ejecución parcialmente paralela.
    """
    print(f"[Hub] Iniciando análisis de: {directorio}")

    # FASE 1: Investigación (primer paso, provee contexto para análisis)
    print("[Hub] Lanzando worker de investigación...")
    res_investigacion = await worker_investigacion(directorio)

    if not res_investigacion.exito:
        return f"Error en investigación: {res_investigacion.error}"

    print(f"[Hub] Investigación completada. Lanzando análisis...")

    # FASE 2: Análisis usa el contexto de investigación
    res_analisis = await worker_analisis(directorio, res_investigacion.resultado)

    if not res_analisis.exito:
        print(f"[Hub] Advertencia: análisis falló: {res_analisis.error}")
        res_analisis.resultado = "Análisis no disponible"

    # FASE 3: Redacción consolida todo
    print("[Hub] Generando reporte final...")
    hallazgos = {
        "investigacion": res_investigacion.resultado,
        "analisis": res_analisis.resultado,
    }

    res_reporte = await worker_redactor(hallazgos)

    if not res_reporte.exito:
        return f"Error generando reporte: {res_reporte.error}"

    print("[Hub] Análisis completo!")
    return res_reporte.resultado

# Uso
if __name__ == "__main__":
    reporte = asyncio.run(coordinador_hub("/home/usuario/mi-proyecto"))
    print(reporte)

Implementación en TypeScript

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

interface WorkerResult {
  name: string;
  result: string;
  success: boolean;
  error?: string;
}

async function workerInvestigacion(directorio: string): Promise<WorkerResult> {
  const opciones: ClaudeCodeOptions = {
    model: "claude-haiku-4-5",
    allowedTools: ["Read", "Glob", "Grep"],
    disallowedTools: ["Bash", "Write", "Edit"],
    systemPrompt: `Eres un agente de exploración de código.
Cataloga la estructura del proyecto en JSON con campos:
archivos_principales, lenguajes, frameworks, configuraciones, tests, documentacion.`,
    maxTurns: 8,
    cwd: directorio,
  };

  try {
    let resultado = "";
    for await (const mensaje of query({
      prompt: `Explora el proyecto en ${directorio}`,
      options: opciones,
    })) {
      if (mensaje.type === "assistant" && mensaje.message.content) {
        for (const bloque of mensaje.message.content) {
          if (bloque.type === "text") {
            resultado = bloque.text;
          }
        }
      }
    }
    return { name: "investigacion", result: resultado, success: true };
  } catch (e) {
    return {
      name: "investigacion",
      result: "",
      success: false,
      error: String(e),
    };
  }
}

async function workerAnalisis(
  directorio: string,
  contexto: string
): Promise<WorkerResult> {
  const opciones: ClaudeCodeOptions = {
    model: "claude-opus-4-5",
    allowedTools: ["Read", "Glob", "Grep"],
    disallowedTools: ["Bash", "Write", "Edit"],
    systemPrompt: `Eres un experto en análisis de calidad de código.
Detecta problemas de arquitectura, code smells, deuda técnica y riesgos de seguridad.
Retorna análisis estructurado en Markdown.`,
    maxTurns: 15,
    cwd: directorio,
  };

  try {
    let resultado = "";
    for await (const mensaje of query({
      prompt: `Basado en esta estructura:\n${contexto}\n\nAnaliza la calidad del código.`,
      options: opciones,
    })) {
      if (mensaje.type === "assistant" && mensaje.message.content) {
        for (const bloque of mensaje.message.content) {
          if (bloque.type === "text") {
            resultado = bloque.text;
          }
        }
      }
    }
    return { name: "analisis", result: resultado, success: true };
  } catch (e) {
    return { name: "analisis", result: "", success: false, error: String(e) };
  }
}

async function coordinadorHub(directorio: string): Promise<string> {
  console.log(`[Hub] Iniciando análisis de: ${directorio}`);

  // Paso 1: Investigación
  const resInvestigacion = await workerInvestigacion(directorio);
  if (!resInvestigacion.success) {
    return `Error: ${resInvestigacion.error}`;
  }

  // Paso 2: Análisis con contexto de investigación
  const resAnalisis = await workerAnalisis(
    directorio,
    resInvestigacion.result
  );
  const contextoAnalisis = resAnalisis.success
    ? resAnalisis.result
    : "Análisis no disponible";

  // Paso 3: Redacción final
  const opciones: ClaudeCodeOptions = {
    model: "claude-opus-4-5",
    allowedTools: [],
    systemPrompt: "Genera reporte ejecutivo en Markdown con hallazgos y recomendaciones.",
    maxTurns: 5,
  };

  let reporte = "";
  for await (const mensaje of query({
    prompt: `Genera reporte con:\n## Estructura\n${resInvestigacion.result}\n## Análisis\n${contextoAnalisis}`,
    options: opciones,
  })) {
    if (mensaje.type === "assistant" && mensaje.message.content) {
      for (const bloque of mensaje.message.content) {
        if (bloque.type === "text") reporte = bloque.text;
      }
    }
  }

  return reporte;
}

// Uso
coordinadorHub("/home/usuario/mi-proyecto").then(console.log);

5. Patrón Pipeline

En el patrón pipeline, cada agente transforma el output del agente anterior. El output de uno es el input del siguiente. Ideal para procesos de transformación lineales.

graph LR
    I[Input Raw] --> A1[Agente 1: Extractor]
    A1 -->|datos extraídos| A2[Agente 2: Normalizador]
    A2 -->|datos normalizados| A3[Agente 3: Enriquecedor]
    A3 -->|datos enriquecidos| A4[Agente 4: Formateador]
    A4 --> O[Output Final]

Implementación Python: Pipeline de análisis de logs

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from typing import Any

async def ejecutar_etapa_pipeline(
    nombre_etapa: str,
    prompt: str,
    input_datos: str,
    system_prompt: str,
    model: str = "claude-haiku-4-5",
    max_turns: int = 5
) -> str:
    """Ejecuta una etapa del pipeline y retorna su output."""
    opciones = ClaudeCodeOptions(
        model=model,
        allowed_tools=[],  # Pipeline de texto puro — sin herramientas
        system_prompt=system_prompt,
        max_turns=max_turns,
    )

    prompt_completo = f"{prompt}\n\n## Input:\n{input_datos}"

    resultado = ""
    async for mensaje in query(prompt=prompt_completo, options=opciones):
        if hasattr(mensaje, 'content'):
            for bloque in mensaje.content:
                if hasattr(bloque, 'text'):
                    resultado = bloque.text

    print(f"[Pipeline] Etapa '{nombre_etapa}' completada ({len(resultado)} chars)")
    return resultado

async def pipeline_analisis_logs(logs_raw: str) -> dict:
    """
    Pipeline de 4 etapas para analizar logs de aplicación.
    Cada etapa transforma el output de la anterior.
    """

    # Etapa 1: Extracción — identifica eventos importantes
    logs_estructurados = await ejecutar_etapa_pipeline(
        nombre_etapa="extraccion",
        prompt="Extrae y estructura los eventos importantes de estos logs.",
        input_datos=logs_raw,
        system_prompt="""Eres un parser de logs. Extrae eventos en JSON:
[{"timestamp": "...", "nivel": "ERROR|WARN|INFO", "mensaje": "...", "servicio": "..."}]
Incluye SOLO eventos ERROR y WARN. Retorna SOLO el JSON array.""",
        model="claude-haiku-4-5"
    )

    # Etapa 2: Clasificación — agrupa por tipo de problema
    logs_clasificados = await ejecutar_etapa_pipeline(
        nombre_etapa="clasificacion",
        prompt="Clasifica estos eventos por categoría de problema.",
        input_datos=logs_estructurados,
        system_prompt="""Eres un clasificador de incidentes.
Agrupa los eventos en categorías: database, network, auth, performance, unknown.
Retorna JSON: {"categoria": [lista_de_eventos]}""",
        model="claude-haiku-4-5"
    )

    # Etapa 3: Análisis de causa raíz
    analisis_causas = await ejecutar_etapa_pipeline(
        nombre_etapa="analisis_causas",
        prompt="Identifica causas raíz y patrones en estos incidentes clasificados.",
        input_datos=logs_clasificados,
        system_prompt="""Eres un experto en análisis de incidentes de producción.
Identifica:
1. Causa raíz probable de cada categoría
2. Patrones temporales (¿se repite cada X tiempo?)
3. Correlaciones entre categorías
4. Impacto estimado en usuarios

Sé específico y accionable.""",
        model="claude-opus-4-5",
        max_turns=8
    )

    # Etapa 4: Generación de recomendaciones
    recomendaciones = await ejecutar_etapa_pipeline(
        nombre_etapa="recomendaciones",
        prompt="Genera recomendaciones de remediación para este análisis.",
        input_datos=analisis_causas,
        system_prompt="""Eres un arquitecto de sistemas senior.
Genera recomendaciones priorizadas:
- P0: Crítico, acción inmediata (menos de 1 hora)
- P1: Alto, acción hoy (menos de 24 horas)
- P2: Medio, planificar esta semana
- P3: Bajo, backlog técnico

Para cada recomendación: descripción, pasos concretos, esfuerzo estimado (S/M/L).""",
        model="claude-opus-4-5",
        max_turns=8
    )

    return {
        "logs_estructurados": logs_estructurados,
        "clasificacion": logs_clasificados,
        "analisis_causas": analisis_causas,
        "recomendaciones": recomendaciones,
    }

# Uso del pipeline
async def main():
    logs = """
2026-03-23 10:15:32 ERROR [database] Connection timeout after 30s: host=db-primary
2026-03-23 10:15:45 ERROR [database] Max connections reached: pool_size=100
2026-03-23 10:16:01 WARN  [api] Response time > 5000ms: endpoint=/api/users
2026-03-23 10:16:15 ERROR [auth] JWT validation failed: token expired
2026-03-23 10:16:20 ERROR [database] Connection timeout after 30s: host=db-primary
"""
    resultado = await pipeline_analisis_logs(logs)
    print("=== RECOMENDACIONES FINALES ===")
    print(resultado["recomendaciones"])

asyncio.run(main())

Pipeline en TypeScript: transformación de datos

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

interface EtapaPipeline {
  nombre: string;
  systemPrompt: string;
  model?: string;
  maxTurns?: number;
}

async function ejecutarEtapa(
  etapa: EtapaPipeline,
  inputDatos: string
): Promise<string> {
  const opciones: ClaudeCodeOptions = {
    model: etapa.model ?? "claude-haiku-4-5",
    allowedTools: [],
    systemPrompt: etapa.systemPrompt,
    maxTurns: etapa.maxTurns ?? 5,
  };

  let resultado = "";
  for await (const mensaje of query({
    prompt: `Procesa este input:\n\n${inputDatos}`,
    options: opciones,
  })) {
    if (mensaje.type === "assistant" && mensaje.message.content) {
      for (const bloque of mensaje.message.content) {
        if (bloque.type === "text") resultado = bloque.text;
      }
    }
  }

  console.log(`[Pipeline] Etapa '${etapa.nombre}' completada (${resultado.length} chars)`);
  return resultado;
}

async function ejecutarPipeline(
  input: string,
  etapas: EtapaPipeline[]
): Promise<string> {
  let datosActuales = input;

  for (const etapa of etapas) {
    datosActuales = await ejecutarEtapa(etapa, datosActuales);
  }

  return datosActuales;
}

// Configuración del pipeline de análisis de código
const pipelineRevisionCodigo: EtapaPipeline[] = [
  {
    nombre: "extraccion",
    systemPrompt: "Extrae las funciones y clases del código. Retorna lista estructurada.",
    model: "claude-haiku-4-5",
    maxTurns: 3,
  },
  {
    nombre: "analisis",
    systemPrompt: "Analiza problemas de calidad en las funciones extraídas. Detecta code smells.",
    model: "claude-opus-4-5",
    maxTurns: 8,
  },
  {
    nombre: "sugerencias",
    systemPrompt: "Genera sugerencias de refactoring específicas para los problemas detectados.",
    model: "claude-opus-4-5",
    maxTurns: 8,
  },
  {
    nombre: "reporte",
    systemPrompt: "Formatea el análisis como reporte Markdown con secciones claras.",
    model: "claude-haiku-4-5",
    maxTurns: 3,
  },
];

// Uso
const codigoAAnalizar = `
def procesar(d, f, x):
    r = []
    for i in d:
        if f(i):
            r.append(x(i))
    return r
`;

ejecutarPipeline(codigoAAnalizar, pipelineRevisionCodigo).then(console.log);

6. Patrón Fan-out / Fan-in

El patrón fan-out lanza múltiples workers en paralelo para procesar diferentes piezas del mismo problema, luego un fan-in consolida todos los resultados.

graph TD
    T[Tarea: Analizar 50 archivos]
    T --> B[Batch Splitter]
    B --> W1[Worker: archivos 1-10]
    B --> W2[Worker: archivos 11-20]
    B --> W3[Worker: archivos 21-30]
    B --> W4[Worker: archivos 31-40]
    B --> W5[Worker: archivos 41-50]
    W1 --> AG[Aggregator / Fan-in]
    W2 --> AG
    W3 --> AG
    W4 --> AG
    W5 --> AG
    AG --> R[Reporte consolidado]

Implementación Python: Análisis paralelo de repositorio

import asyncio
import json
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
from typing import List

async def analizar_archivo(ruta: str) -> dict:
    """Analiza un archivo individual buscando problemas."""
    opciones = ClaudeCodeOptions(
        model="claude-haiku-4-5",  # Haiku para velocidad y costo
        allowed_tools=["Read"],
        disallowed_tools=["Bash", "Write", "Edit", "Glob", "Grep"],
        system_prompt="""Eres un revisor de código. Analiza el archivo y retorna JSON:
{
  "archivo": "ruta",
  "problemas": [{"tipo": "bug|style|security|performance", "linea": N, "descripcion": "..."}],
  "puntuacion": 0-10,
  "resumen": "una oración"
}
Si el archivo está bien, retorna problemas: []""",
        max_turns=3,  # Límite bajo — es una tarea simple
    )

    try:
        resultado = ""
        async for mensaje in query(
            prompt=f"Analiza este archivo: {ruta}",
            options=opciones
        ):
            if hasattr(mensaje, 'content'):
                for bloque in mensaje.content:
                    if hasattr(bloque, 'text'):
                        resultado = bloque.text

        # Intentar parsear el JSON del resultado
        try:
            inicio = resultado.find('{')
            fin = resultado.rfind('}') + 1
            if inicio >= 0 and fin > inicio:
                return json.loads(resultado[inicio:fin])
        except json.JSONDecodeError:
            pass

        return {
            "archivo": ruta,
            "problemas": [],
            "puntuacion": 5,
            "resumen": resultado[:100]
        }

    except Exception as e:
        return {
            "archivo": ruta,
            "error": str(e),
            "problemas": [],
            "puntuacion": 0
        }

async def analizar_batch(archivos: List[str]) -> List[dict]:
    """Analiza un batch de archivos en paralelo (fan-out dentro del batch)."""
    tareas = [analizar_archivo(arch) for arch in archivos]
    resultados = await asyncio.gather(*tareas, return_exceptions=True)

    validos = []
    for r in resultados:
        if isinstance(r, Exception):
            print(f"[Fan-out] Error en batch: {r}")
        else:
            validos.append(r)
    return validos

async def fan_out_analisis(directorio: str, tamano_batch: int = 5) -> dict:
    """
    Ejecuta análisis de todos los archivos Python en paralelo.
    Fan-out: divide en batches paralelos.
    Fan-in: consolida resultados.
    """
    # Encontrar todos los archivos Python
    archivos = [
        str(p) for p in Path(directorio).glob("**/*.py")
        if "__pycache__" not in str(p)
    ]

    print(f"[Fan-out] Analizando {len(archivos)} archivos en batches de {tamano_batch}")

    # Dividir en batches
    batches = [
        archivos[i:i+tamano_batch]
        for i in range(0, len(archivos), tamano_batch)
    ]

    # Fan-out: ejecutar todos los batches en paralelo
    tareas_batch = [analizar_batch(batch) for batch in batches]
    resultados_batches = await asyncio.gather(*tareas_batch)

    # Fan-in: consolidar todos los resultados
    todos_resultados = []
    for batch_resultado in resultados_batches:
        todos_resultados.extend(batch_resultado)

    # Calcular métricas agregadas
    total_problemas = sum(len(r.get("problemas", [])) for r in todos_resultados)
    archivos_con_problemas = [r for r in todos_resultados if r.get("problemas")]
    puntuacion_promedio = (
        sum(r.get("puntuacion", 5) for r in todos_resultados) / len(todos_resultados)
        if todos_resultados else 0
    )

    # Clasificar problemas por tipo
    por_tipo: dict = {"bug": [], "security": [], "performance": [], "style": []}
    for resultado in todos_resultados:
        for problema in resultado.get("problemas", []):
            tipo = problema.get("tipo", "style")
            if tipo in por_tipo:
                por_tipo[tipo].append({
                    "archivo": resultado["archivo"],
                    **problema
                })

    return {
        "total_archivos": len(archivos),
        "archivos_analizados": len(todos_resultados),
        "total_problemas": total_problemas,
        "puntuacion_promedio": round(puntuacion_promedio, 1),
        "archivos_con_problemas": len(archivos_con_problemas),
        "problemas_por_tipo": {k: len(v) for k, v in por_tipo.items()},
        "problemas_criticos": por_tipo["bug"] + por_tipo["security"],
        "detalles": todos_resultados,
    }

# Uso
async def main():
    resultado = await fan_out_analisis("/mi/proyecto", tamano_batch=5)
    print(f"Análisis completado:")
    print(f"  Archivos: {resultado['total_archivos']}")
    print(f"  Puntuación: {resultado['puntuacion_promedio']}/10")
    print(f"  Problemas críticos: {len(resultado['problemas_criticos'])}")

asyncio.run(main())

Fan-out con semáforo para controlar concurrencia

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from typing import Callable, Any

async def fan_out_controlado(
    items: list,
    procesador: Callable,
    max_concurrente: int = 3
) -> list:
    """
    Fan-out con límite de concurrencia.
    Evita sobrecargar la API o el sistema de archivos.
    """
    semaforo = asyncio.Semaphore(max_concurrente)

    async def procesar_con_semaforo(item: Any):
        async with semaforo:
            return await procesador(item)

    tareas = [procesar_con_semaforo(item) for item in items]
    return await asyncio.gather(*tareas, return_exceptions=True)

# Uso
async def main():
    archivos = ["archivo1.py", "archivo2.py", "archivo3.py", "archivo4.py", "archivo5.py"]

    # Máximo 3 agentes concurrentes para no sobrecargar la API
    resultados = await fan_out_controlado(
        archivos,
        analizar_archivo,
        max_concurrente=3
    )
    print(f"Procesados: {len(resultados)} archivos")

asyncio.run(main())

Fan-out en TypeScript con Promise.allSettled

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

interface ResultadoArchivo {
  archivo: string;
  problemas: Array<{ tipo: string; linea: number; descripcion: string }>;
  puntuacion: number;
  resumen: string;
}

async function analizarArchivo(ruta: string): Promise<ResultadoArchivo> {
  const opciones: ClaudeCodeOptions = {
    model: "claude-haiku-4-5",
    allowedTools: ["Read"],
    disallowedTools: ["Bash", "Write", "Edit"],
    systemPrompt: `Analiza el archivo y retorna JSON:
{
  "archivo": "ruta",
  "problemas": [{"tipo": "bug|style|security|performance", "linea": N, "descripcion": "..."}],
  "puntuacion": 0-10,
  "resumen": "una oración"
}`,
    maxTurns: 3,
  };

  let resultado = "";
  for await (const mensaje of query({
    prompt: `Analiza: ${ruta}`,
    options: opciones,
  })) {
    if (mensaje.type === "assistant" && mensaje.message.content) {
      for (const bloque of mensaje.message.content) {
        if (bloque.type === "text") resultado = bloque.text;
      }
    }
  }

  try {
    const inicio = resultado.indexOf("{");
    const fin = resultado.lastIndexOf("}") + 1;
    if (inicio >= 0) return JSON.parse(resultado.slice(inicio, fin));
  } catch {}

  return { archivo: ruta, problemas: [], puntuacion: 5, resumen: resultado };
}

async function fanOutAnalisis(archivos: string[]): Promise<{
  exitosos: ResultadoArchivo[];
  fallidos: string[];
}> {
  // Promise.allSettled nunca rechaza — maneja errores individuales
  const resultados = await Promise.allSettled(
    archivos.map((archivo) => analizarArchivo(archivo))
  );

  const exitosos: ResultadoArchivo[] = [];
  const fallidos: string[] = [];

  resultados.forEach((resultado, index) => {
    if (resultado.status === "fulfilled") {
      exitosos.push(resultado.value);
    } else {
      fallidos.push(archivos[index]);
      console.error(`Error en ${archivos[index]}:`, resultado.reason);
    }
  });

  return { exitosos, fallidos };
}

// Uso
const archivos = ["src/auth.ts", "src/db.ts", "src/api.ts"];
fanOutAnalisis(archivos).then(({ exitosos, fallidos }) => {
  console.log(`Exitosos: ${exitosos.length}, Fallidos: ${fallidos.length}`);
});

7. Subagentes con aislamiento: worktrees Git

Cuando múltiples agentes trabajan sobre el mismo repositorio Git simultáneamente, pueden entrar en conflicto. El parámetro de aislamiento via worktree crea una copia del repositorio en rama separada para cada agente.

graph TD
    R[Repositorio Git Principal]
    R --> WT1[Worktree 1: rama agent-fix-auth]
    R --> WT2[Worktree 2: rama agent-refactor-db]
    R --> WT3[Worktree 3: rama agent-add-tests]

    WT1 --> A1[Agente 1: Arregla autenticación]
    WT2 --> A2[Agente 2: Refactoriza base de datos]
    WT3 --> A3[Agente 3: Agrega tests]

    A1 --> PR1[Pull Request 1]
    A2 --> PR2[Pull Request 2]
    A3 --> PR3[Pull Request 3]

Cómo funciona el worktree

Un Git worktree permite tener múltiples checkouts del mismo repositorio en diferentes directorios, cada uno en una rama diferente. Los archivos son independientes pero comparten el objeto de base de datos Git.

import asyncio
import subprocess
import tempfile
import os
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions

class WorktreeManager:
    """Gestiona la creación y limpieza de worktrees Git."""

    def __init__(self, repo_path: str):
        self.repo_path = Path(repo_path)

    def crear_worktree(self, rama: str) -> str:
        """Crea un worktree en un directorio temporal."""
        worktree_dir = tempfile.mkdtemp(prefix=f"agent-{rama}-")

        # Crear nueva rama desde HEAD
        subprocess.run(
            ["git", "worktree", "add", "-b", rama, worktree_dir],
            cwd=self.repo_path,
            check=True,
            capture_output=True
        )

        return worktree_dir

    def limpiar_worktree(self, worktree_dir: str, rama: str):
        """Elimina el worktree y su rama."""
        subprocess.run(
            ["git", "worktree", "remove", "--force", worktree_dir],
            cwd=self.repo_path,
            capture_output=True
        )
        subprocess.run(
            ["git", "branch", "-D", rama],
            cwd=self.repo_path,
            capture_output=True
        )

async def agente_en_worktree_aislado(
    repo_path: str,
    tarea: str,
    rama: str
) -> dict:
    """Ejecuta un agente en un worktree Git completamente aislado."""
    manager = WorktreeManager(repo_path)
    worktree_dir = None

    try:
        # Crear worktree aislado
        worktree_dir = manager.crear_worktree(rama)
        print(f"[Worktree] Creado worktree en: {worktree_dir} (rama: {rama})")

        opciones = ClaudeCodeOptions(
            model="claude-opus-4-5",
            allowed_tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"],
            system_prompt=f"""Trabajas en una rama aislada: {rama}.
Puedes hacer todos los cambios necesarios.
Al finalizar, ejecuta: git add -A && git commit -m "feat: {tarea[:50]}"
No hagas push — solo commit local.""",
            max_turns=20,
            cwd=worktree_dir,
        )

        resultado = ""
        async for mensaje in query(prompt=tarea, options=opciones):
            if hasattr(mensaje, 'content'):
                for bloque in mensaje.content:
                    if hasattr(bloque, 'text'):
                        resultado = bloque.text

        return {
            "rama": rama,
            "worktree_dir": worktree_dir,
            "resultado": resultado,
            "exito": True
        }

    except Exception as e:
        return {
            "rama": rama,
            "worktree_dir": worktree_dir,
            "resultado": "",
            "exito": False,
            "error": str(e)
        }
    # Nota: no limpiar aquí — el caller decide si hace merge o descarta

async def agentes_paralelos_aislados(
    repo_path: str,
    tareas: list
) -> list:
    """Lanza múltiples agentes en worktrees aislados simultáneamente."""
    coroutines = [
        agente_en_worktree_aislado(
            repo_path,
            tarea["descripcion"],
            tarea["rama"]
        )
        for tarea in tareas
    ]

    resultados = await asyncio.gather(*coroutines, return_exceptions=True)

    # Filtrar excepciones
    return [
        r if not isinstance(r, Exception) else {"error": str(r), "exito": False}
        for r in resultados
    ]

# Uso
async def main():
    tareas = [
        {
            "descripcion": "Arregla el bug de autenticación en auth/middleware.py",
            "rama": "fix/auth-bug"
        },
        {
            "descripcion": "Refactoriza la capa de base de datos para usar Repository Pattern",
            "rama": "refactor/db-layer"
        },
        {
            "descripcion": "Agrega tests unitarios para el módulo de pagos",
            "rama": "feat/payment-tests"
        },
    ]

    resultados = await agentes_paralelos_aislados("/mi/repo", tareas)

    for r in resultados:
        if r.get("exito"):
            print(f"[OK] Rama {r['rama']}: cambios listos para review")
        else:
            print(f"[Error] {r.get('rama', 'desconocida')}: {r.get('error')}")

asyncio.run(main())

8. Comunicación entre agentes

Los subagentes no se comunican directamente entre sí — toda comunicación pasa por el orquestador. Este patrón evita acoplamientos directos y hace el sistema más fácil de depurar.

sequenceDiagram
    participant O as Orquestador
    participant W1 as Worker 1
    participant W2 as Worker 2
    participant Store as Estado Compartido

    O->>W1: Tarea A con contexto inicial
    W1-->>O: Resultado A
    O->>Store: Guardar resultado A
    O->>W2: Tarea B con resultado A como contexto
    W2-->>O: Resultado B
    O->>Store: Guardar resultado B
    Note over O: El orquestador decide qué<br/>información comparte con quién

Patrón: Estado compartido en memoria

from dataclasses import dataclass, field
from typing import Any, Dict

@dataclass
class EstadoOrquestador:
    """Estado compartido gestionado por el orquestador."""
    contexto: Dict[str, Any] = field(default_factory=dict)
    resultados: Dict[str, str] = field(default_factory=dict)
    errores: list = field(default_factory=list)

    def agregar_resultado(self, worker: str, resultado: str):
        self.resultados[worker] = resultado

    def obtener_contexto_para(self, worker: str) -> str:
        """Construye el contexto relevante para un worker específico."""
        partes = []
        if worker == "analisis" and "investigacion" in self.resultados:
            partes.append(f"Estructura del proyecto:\n{self.resultados['investigacion']}")
        elif worker == "redactor":
            for k, v in self.resultados.items():
                partes.append(f"## {k.title()}\n{v}")
        return "\n\n".join(partes)

    def hay_errores_criticos(self) -> bool:
        """Determina si algún error impide continuar."""
        return any(e.get("critico", False) for e in self.errores)

async def orquestador_con_estado(directorio: str) -> str:
    """Orquestador que mantiene estado explícito entre workers."""
    estado = EstadoOrquestador()

    # Worker 1 con contexto mínimo
    res1 = await worker_investigacion(directorio)
    if res1.exito:
        estado.agregar_resultado("investigacion", res1.resultado)
    else:
        estado.errores.append({"worker": "investigacion", "error": res1.error, "critico": True})

    if estado.hay_errores_criticos():
        return "Pipeline abortado por error crítico"

    # Worker 2 recibe contexto del worker 1
    contexto_para_analisis = estado.obtener_contexto_para("analisis")
    res2 = await worker_analisis(directorio, contexto_para_analisis)
    if res2.exito:
        estado.agregar_resultado("analisis", res2.resultado)
    else:
        # Error no crítico — continuar con lo que tenemos
        estado.errores.append({"worker": "analisis", "error": res2.error, "critico": False})
        estado.agregar_resultado("analisis", "Análisis parcial no disponible")

    # Worker final recibe todo el contexto
    res3 = await worker_redactor(estado.resultados)
    return res3.resultado if res3.exito else "Error generando reporte final"

Patrón: Estado compartido con archivo temporal

Para sistemas más complejos donde los workers necesitan acceder a resultados de múltiples etapas previas, un archivo JSON temporal puede servir como “memoria” compartida:

import asyncio
import json
import tempfile
import os
from claude_code_sdk import query, ClaudeCodeOptions

class MemoriaCompartida:
    """Archivo JSON como estado compartido entre workers."""

    def __init__(self):
        self._archivo = tempfile.mktemp(suffix=".json")
        self._guardar({})

    def _guardar(self, datos: dict):
        with open(self._archivo, 'w', encoding='utf-8') as f:
            json.dump(datos, f, ensure_ascii=False, indent=2)

    def _cargar(self) -> dict:
        try:
            with open(self._archivo, encoding='utf-8') as f:
                return json.load(f)
        except (json.JSONDecodeError, FileNotFoundError):
            return {}

    def escribir(self, clave: str, valor: Any):
        datos = self._cargar()
        datos[clave] = valor
        self._guardar(datos)

    def leer(self, clave: str, default: Any = None) -> Any:
        return self._cargar().get(clave, default)

    def limpiar(self):
        if os.path.exists(self._archivo):
            os.remove(self._archivo)

# Uso en el orquestador
memoria = MemoriaCompartida()

async def worker_que_lee_memoria(clave_input: str, clave_output: str) -> str:
    """Worker que lee contexto de la memoria compartida."""
    contexto = memoria.leer(clave_input, "Sin contexto previo")

    opciones = ClaudeCodeOptions(
        model="claude-haiku-4-5",
        allowed_tools=[],
        max_turns=5,
        system_prompt="Procesa el input y retorna resultado estructurado."
    )

    resultado = ""
    async for m in query(
        prompt=f"Procesa esto:\n{contexto}",
        options=opciones
    ):
        if hasattr(m, 'content'):
            for b in m.content:
                if hasattr(b, 'text'):
                    resultado = b.text

    memoria.escribir(clave_output, resultado)
    return resultado

9. Manejo de errores en subagentes

Los errores en subagentes son inevitables en producción. El sistema debe manejarlos graciosamente: reintentar, usar fallbacks, o degradar funcionalidad.

flowchart TD
    T[Tarea al Worker]
    T --> E{¿Error?}
    E -->|No| R[Resultado exitoso]
    E -->|Sí| RE{¿Reintentable?}
    RE -->|No — error fatal| F[Fallback / resultado parcial]
    RE -->|Sí| RT{¿Intentos menor que max?}
    RT -->|Sí| D[Esperar backoff exponencial]
    D --> T
    RT -->|No — agotado| F
    F --> AG[Agregar al resultado con flag de error]

Implementación: Worker con reintentos y fallback

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from typing import Callable, Optional

class WorkerConReintentos:
    """Worker robusto con reintentos automáticos y fallback configurable."""

    def __init__(
        self,
        max_reintentos: int = 3,
        backoff_base: float = 1.0,
        timeout: float = 60.0
    ):
        self.max_reintentos = max_reintentos
        self.backoff_base = backoff_base
        self.timeout = timeout

    async def ejecutar(
        self,
        prompt: str,
        opciones: ClaudeCodeOptions,
        fallback: Optional[Callable] = None
    ) -> str:
        ultimo_error = None

        for intento in range(self.max_reintentos):
            try:
                resultado = await asyncio.wait_for(
                    self._ejecutar_query(prompt, opciones),
                    timeout=self.timeout
                )
                return resultado

            except asyncio.TimeoutError:
                ultimo_error = f"Timeout después de {self.timeout}s"
                print(f"[Worker] Intento {intento+1}: timeout")

            except Exception as e:
                ultimo_error = str(e)
                print(f"[Worker] Intento {intento+1}: error — {e}")

                # No reintentar ciertos errores fatales
                if "invalid_api_key" in str(e).lower():
                    raise

            # Backoff exponencial: 1s, 2s, 4s...
            if intento < self.max_reintentos - 1:
                espera = self.backoff_base * (2 ** intento)
                print(f"[Worker] Esperando {espera}s antes de reintentar...")
                await asyncio.sleep(espera)

        # Agotados los reintentos
        if fallback:
            print(f"[Worker] Usando fallback después de {self.max_reintentos} intentos")
            return await fallback(ultimo_error)

        raise RuntimeError(
            f"Worker falló después de {self.max_reintentos} intentos: {ultimo_error}"
        )

    async def _ejecutar_query(self, prompt: str, opciones: ClaudeCodeOptions) -> str:
        resultado = ""
        async for mensaje in query(prompt=prompt, options=opciones):
            if hasattr(mensaje, 'content'):
                for bloque in mensaje.content:
                    if hasattr(bloque, 'text'):
                        resultado = bloque.text
        return resultado

# Uso con fallback
worker = WorkerConReintentos(max_reintentos=3, timeout=30.0)

async def fallback_simple(error: str) -> str:
    return json.dumps({
        "error": "Worker no disponible",
        "razon": error,
        "resultado_parcial": None
    })

async def analisis_con_resiliencia(archivo: str) -> str:
    opciones = ClaudeCodeOptions(
        model="claude-haiku-4-5",
        allowed_tools=["Read"],
        max_turns=5,
        system_prompt="Analiza el archivo y reporta problemas."
    )
    return await worker.ejecutar(
        prompt=f"Analiza {archivo}",
        opciones=opciones,
        fallback=fallback_simple
    )

Circuit breaker para proteger el sistema

from enum import Enum
import time

class EstadoCircuit(Enum):
    CERRADO = "cerrado"      # Normal — peticiones fluyen
    ABIERTO = "abierto"      # Circuito roto — bloquea peticiones
    SEMI_ABIERTO = "semi_abierto"  # Probando si ya funciona

class CircuitBreaker:
    """Evita llamar a un worker que está fallando consistentemente."""

    def __init__(
        self,
        umbral_fallas: int = 5,
        tiempo_reset: float = 60.0
    ):
        self.umbral_fallas = umbral_fallas
        self.tiempo_reset = tiempo_reset
        self.fallas_consecutivas = 0
        self.ultimo_fallo = 0.0
        self.estado = EstadoCircuit.CERRADO

    def registrar_exito(self):
        self.fallas_consecutivas = 0
        self.estado = EstadoCircuit.CERRADO

    def registrar_fallo(self):
        self.fallas_consecutivas += 1
        self.ultimo_fallo = time.time()
        if self.fallas_consecutivas >= self.umbral_fallas:
            self.estado = EstadoCircuit.ABIERTO
            print(f"[CircuitBreaker] ABIERTO — demasiados fallos")

    def puede_pasar(self) -> bool:
        if self.estado == EstadoCircuit.CERRADO:
            return True

        if self.estado == EstadoCircuit.ABIERTO:
            # Verificar si ya pasó el tiempo de reset
            if time.time() - self.ultimo_fallo > self.tiempo_reset:
                self.estado = EstadoCircuit.SEMI_ABIERTO
                return True
            return False

        # SEMI_ABIERTO: dejar pasar una petición de prueba
        return True

# Uso global del circuit breaker
cb_worker_analisis = CircuitBreaker(umbral_fallas=3, tiempo_reset=30.0)

async def analisis_con_circuit_breaker(archivo: str) -> Optional[str]:
    if not cb_worker_analisis.puede_pasar():
        print(f"[CB] Worker bloqueado por circuit breaker — usando fallback")
        return None

    try:
        resultado = await analizar_archivo(archivo)
        cb_worker_analisis.registrar_exito()
        return resultado
    except Exception as e:
        cb_worker_analisis.registrar_fallo()
        print(f"[CB] Error registrado: {e}")
        return None

10. Límites y consideraciones

Profundidad máxima de anidación

El SDK no impone un límite técnico estricto de anidación, pero existen límites prácticos importantes:

graph TD
    N0[Nivel 0: Tu aplicación]
    N1[Nivel 1: Orquestador principal]
    N2[Nivel 2: Manager / Coordinador de área]
    N3[Nivel 3: Worker especializado]
    N4[Nivel 4: Sub-worker — EVITAR]
    STOP[Más niveles — NUNCA]

    N0 --> N1 --> N2 --> N3 --> N4 --> STOP

    style N4 fill:#ffa500,color:#000
    style STOP fill:#ff6b6b,color:#fff

Regla práctica: Máximo 3 niveles de anidación. Cada nivel adicional:

Modelo de costos

def estimar_costo_sistema_multiagente(config: dict) -> dict:
    """Estima el costo aproximado de un sistema multi-agente."""

    precios = {
        "claude-haiku-4-5": {"input": 0.00025, "output": 0.00125},
        "claude-opus-4-5": {"input": 0.015, "output": 0.075},
        "claude-sonnet-4-5": {"input": 0.003, "output": 0.015},
    }

    total_usd = 0.0
    desglose = []

    for worker in config.get("workers", []):
        modelo = worker["modelo"]
        tokens_input = worker["tokens_input_estimados"]
        tokens_output = worker["tokens_output_estimados"]
        cantidad = worker.get("cantidad", 1)

        precio = precios.get(modelo, precios["claude-haiku-4-5"])
        costo_unitario = (
            tokens_input / 1000 * precio["input"] +
            tokens_output / 1000 * precio["output"]
        )
        costo_total = costo_unitario * cantidad

        desglose.append({
            "worker": worker["nombre"],
            "modelo": modelo,
            "cantidad": cantidad,
            "costo_unitario_usd": round(costo_unitario, 6),
            "costo_total_usd": round(costo_total, 4),
        })
        total_usd += costo_total

    return {
        "total_usd": round(total_usd, 4),
        "desglose": desglose,
    }

# Ejemplo: estimar costo del fan-out de 50 archivos
config_fanout = {
    "workers": [
        {
            "nombre": "worker_analisis",
            "modelo": "claude-haiku-4-5",
            "tokens_input_estimados": 3000,
            "tokens_output_estimados": 500,
            "cantidad": 50,  # Un worker por archivo
        },
        {
            "nombre": "worker_reporte",
            "modelo": "claude-opus-4-5",
            "tokens_input_estimados": 10000,
            "tokens_output_estimados": 2000,
            "cantidad": 1,
        }
    ]
}

estimacion = estimar_costo_sistema_multiagente(config_fanout)
print(f"Costo estimado: ${estimacion['total_usd']} USD")
for item in estimacion['desglose']:
    print(f"  {item['worker']} x{item['cantidad']}: ${item['costo_total_usd']}")

Reglas prácticas de límites

ParámetroRecomendaciónRazón
max_turnsMáximo 20 por workerEvita bucles infinitos
Workers simultáneosMáximo 10Rate limits de API
Profundidad de anidaciónMáximo 3 nivelesDebugging y costo
Timeout por worker30-120 segundosEvita bloqueos
Tamaño de batch5-10 itemsBalance paralelismo/costo
Contexto por workerBajo 100K tokensCosto y velocidad

11. Ejemplo completo: Sistema de investigación multi-agente

Este sistema implementa un investigador que busca, analiza y redacta usando tres workers especializados con manejo de errores completo.

import asyncio
import json
from claude_code_sdk import query, ClaudeCodeOptions
from dataclasses import dataclass, field
from typing import List, Optional

@dataclass
class Hallazgo:
    titulo: str
    contenido: str
    fuente: str
    relevancia: int  # 1-10

@dataclass
class Investigacion:
    tema: str
    hallazgos: List[Hallazgo] = field(default_factory=list)
    analisis: str = ""
    reporte: str = ""
    errores: List[str] = field(default_factory=list)

async def worker_buscador(tema: str, directorio: str) -> List[Hallazgo]:
    """Busca información relevante en el sistema de archivos."""
    opciones = ClaudeCodeOptions(
        model="claude-haiku-4-5",
        allowed_tools=["Glob", "Grep", "Read"],
        disallowed_tools=["Bash", "Write", "Edit"],
        system_prompt=f"""Eres un investigador experto en código.
Busca TODA la información disponible sobre: "{tema}"
Revisa archivos markdown, código, configuraciones y documentación.
Retorna un JSON array de hallazgos con este formato exacto:
[
  {{
    "titulo": "Nombre descriptivo del hallazgo",
    "contenido": "Descripción detallada del contenido encontrado",
    "fuente": "ruta/al/archivo.ext",
    "relevancia": 8
  }}
]
Incluye solo hallazgos con relevancia mayor o igual a 5.
Retorna SOLO el JSON array, sin texto adicional.""",
        max_turns=15,
        cwd=directorio,
    )

    try:
        resultado = ""
        async for mensaje in query(
            prompt=f"Busca toda la información sobre '{tema}' en este proyecto.",
            options=opciones
        ):
            if hasattr(mensaje, 'content'):
                for bloque in mensaje.content:
                    if hasattr(bloque, 'text'):
                        resultado = bloque.text

        # Parsear JSON de hallazgos
        inicio = resultado.find('[')
        fin = resultado.rfind(']') + 1
        if inicio >= 0:
            hallazgos_raw = json.loads(resultado[inicio:fin])
            return [Hallazgo(**h) for h in hallazgos_raw]

    except (json.JSONDecodeError, TypeError, KeyError, Exception) as e:
        print(f"[Buscador] Error parseando hallazgos: {e}")

    # Fallback: retornar hallazgo genérico
    return [Hallazgo(
        titulo="Búsqueda general",
        contenido=resultado if resultado else "Sin resultados",
        fuente="múltiples archivos",
        relevancia=5
    )]

async def worker_analizador(tema: str, hallazgos: List[Hallazgo]) -> str:
    """Analiza los hallazgos y extrae insights profundos."""
    if not hallazgos:
        return "No hay hallazgos para analizar."

    hallazgos_texto = "\n\n".join([
        f"### {h.titulo} (Relevancia: {h.relevancia}/10)\n"
        f"Fuente: {h.fuente}\n{h.contenido}"
        for h in sorted(hallazgos, key=lambda x: x.relevancia, reverse=True)
    ])

    opciones = ClaudeCodeOptions(
        model="claude-opus-4-5",
        allowed_tools=[],
        system_prompt="""Eres un analista experto en sistemas de software.
Sintetizas información técnica compleja en insights accionables.
Analiza los hallazgos y produce un análisis estructurado con:
1. Patrones principales identificados
2. Contradicciones o inconsistencias
3. Brechas de información (qué falta saber)
4. Conclusiones clave con nivel de confianza
5. Riesgos identificados
Sé específico, técnico y concreto.""",
        max_turns=5,
    )

    resultado = ""
    async for mensaje in query(
        prompt=f"Analiza estos hallazgos sobre '{tema}':\n\n{hallazgos_texto}",
        options=opciones
    ):
        if hasattr(mensaje, 'content'):
            for bloque in mensaje.content:
                if hasattr(bloque, 'text'):
                    resultado = bloque.text

    return resultado

async def worker_redactor_investigacion(
    tema: str,
    hallazgos: List[Hallazgo],
    analisis: str
) -> str:
    """Redacta el reporte final de investigación."""
    opciones = ClaudeCodeOptions(
        model="claude-opus-4-5",
        allowed_tools=[],
        system_prompt="""Eres un redactor técnico senior que produce reportes ejecutivos.
El reporte debe tener:
- Título y fecha
- Resumen ejecutivo (máximo 3 párrafos)
- Metodología breve
- Hallazgos principales (con evidencia citada)
- Análisis e interpretación
- Recomendaciones concretas priorizadas (P0, P1, P2)
- Próximos pasos sugeridos
- Referencias (lista de fuentes)

Usa Markdown con encabezados, listas y tablas donde corresponda.
Sé conciso pero completo.""",
        max_turns=5,
    )

    hallazgos_resumen = "\n".join([
        f"- [{h.relevancia}/10] {h.titulo}: {h.fuente}"
        for h in hallazgos
    ])

    prompt = f"""Redacta un reporte de investigación completo sobre: "{tema}"

## Hallazgos recolectados:
{hallazgos_resumen}

## Análisis técnico:
{analisis}
"""

    resultado = ""
    async for mensaje in query(prompt=prompt, options=opciones):
        if hasattr(mensaje, 'content'):
            for bloque in mensaje.content:
                if hasattr(bloque, 'text'):
                    resultado = bloque.text

    return resultado

async def sistema_investigacion(tema: str, directorio: str) -> Investigacion:
    """
    Sistema completo de investigación multi-agente.
    Patrón: Hub-and-Spoke con dependencias secuenciales.
    Resiliencia: cada etapa tiene manejo de errores independiente.
    """
    investigacion = Investigacion(tema=tema)
    print(f"[Investigación] Iniciando: '{tema}'")

    # Paso 1: Búsqueda
    try:
        print("[Investigación] Buscando hallazgos...")
        investigacion.hallazgos = await asyncio.wait_for(
            worker_buscador(tema, directorio),
            timeout=120.0
        )
        print(f"[Investigación] Encontrados {len(investigacion.hallazgos)} hallazgos")
    except asyncio.TimeoutError:
        investigacion.errores.append("Búsqueda: timeout después de 120s")
        print("[Investigación] Advertencia: búsqueda agotó el timeout")

    if not investigacion.hallazgos:
        investigacion.reporte = "# Sin resultados\n\nNo se encontró información sobre el tema."
        return investigacion

    # Paso 2: Análisis
    try:
        print("[Investigación] Analizando hallazgos...")
        investigacion.analisis = await asyncio.wait_for(
            worker_analizador(tema, investigacion.hallazgos),
            timeout=60.0
        )
    except Exception as e:
        investigacion.errores.append(f"Análisis: {str(e)}")
        investigacion.analisis = "Análisis no disponible por error."
        print(f"[Investigación] Advertencia: análisis falló: {e}")

    # Paso 3: Redacción
    try:
        print("[Investigación] Redactando reporte...")
        investigacion.reporte = await asyncio.wait_for(
            worker_redactor_investigacion(
                tema,
                investigacion.hallazgos,
                investigacion.analisis
            ),
            timeout=60.0
        )
    except Exception as e:
        investigacion.errores.append(f"Redacción: {str(e)}")
        # Reporte de emergencia con los datos disponibles
        investigacion.reporte = (
            f"# Reporte: {tema}\n\n"
            f"## Hallazgos\n{hallazgos_resumen}\n\n"
            f"## Análisis\n{investigacion.analisis}"
        )

    print(f"[Investigación] Completado. Errores: {len(investigacion.errores)}")
    return investigacion

# Uso
async def main():
    resultado = await sistema_investigacion(
        tema="patrones de autenticación",
        directorio="/home/usuario/mi-proyecto"
    )
    print(resultado.reporte)
    if resultado.errores:
        print(f"\nAdvertencias: {resultado.errores}")

asyncio.run(main())

12. Ejemplo completo: Agente de refactoring paralelo

Procesa múltiples archivos en paralelo, con control de concurrencia y reporte detallado.

import asyncio
import json
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
from dataclasses import dataclass
from typing import List

@dataclass
class ResultadoRefactoring:
    archivo: str
    cambios_realizados: List[str]
    mejoras: str
    exito: bool
    error: str = ""

async def refactorizar_archivo(ruta: str) -> ResultadoRefactoring:
    """Refactoriza un archivo Python individual."""
    opciones = ClaudeCodeOptions(
        model="claude-opus-4-5",
        allowed_tools=["Read", "Edit"],
        disallowed_tools=["Bash", "Write", "Glob", "Grep"],
        system_prompt="""Eres un experto en refactoring de Python.
Aplica SOLO estas mejoras cuando sea necesario:
1. Extrae funciones largas (>30 líneas) en funciones más pequeñas
2. Elimina código duplicado evidente
3. Mejora nombres de variables poco descriptivos (x, tmp, d, etc.)
4. Agrega type hints donde falten en funciones
5. Simplifica lógica booleana compleja

REGLAS CRÍTICAS:
- NO cambies la funcionalidad bajo ninguna circunstancia
- NO modifiques tests existentes
- Haz cambios conservadores y seguros
- Si el archivo ya está bien, NO toques nada

Al finalizar, retorna SOLO este JSON:
{"cambios": ["descripcion del cambio 1", "descripcion 2"], "mejoras": "resumen en una oración"}
Si no hiciste cambios: {"cambios": [], "mejoras": "Archivo ya cumple los estándares"}""",
        max_turns=10,
    )

    try:
        resultado = ""
        async for mensaje in query(
            prompt=f"Refactoriza este archivo: {ruta}",
            options=opciones
        ):
            if hasattr(mensaje, 'content'):
                for bloque in mensaje.content:
                    if hasattr(bloque, 'text'):
                        resultado = bloque.text

        # Extraer JSON de cambios del final del resultado
        cambios = []
        mejoras = ""
        try:
            inicio = resultado.rfind('{"cambios"')
            if inicio >= 0:
                json_str = resultado[inicio:resultado.rfind('}')+1]
                datos = json.loads(json_str)
                cambios = datos.get("cambios", [])
                mejoras = datos.get("mejoras", "")
        except (json.JSONDecodeError, ValueError):
            # Si no hay JSON, el resultado mismo es la descripción
            mejoras = resultado[-300:] if len(resultado) > 300 else resultado

        return ResultadoRefactoring(
            archivo=ruta,
            cambios_realizados=cambios,
            mejoras=mejoras,
            exito=True
        )

    except Exception as e:
        return ResultadoRefactoring(
            archivo=ruta,
            cambios_realizados=[],
            mejoras="",
            exito=False,
            error=str(e)
        )

async def refactoring_paralelo(
    directorio: str,
    patron: str = "**/*.py",
    max_concurrente: int = 4,
    excluir: List[str] = None
) -> dict:
    """Refactoriza todos los archivos Python en paralelo con control de concurrencia."""
    excluir = excluir or ["__pycache__", "test_", "migrations", ".venv"]

    archivos = [
        str(p) for p in Path(directorio).glob(patron)
        if not any(excl in str(p) for excl in excluir)
    ]

    print(f"[Refactoring] Procesando {len(archivos)} archivos, {max_concurrente} concurrentes")

    semaforo = asyncio.Semaphore(max_concurrente)

    async def procesar_con_limite(archivo: str) -> ResultadoRefactoring:
        async with semaforo:
            nombre = Path(archivo).name
            print(f"[Refactoring] Iniciando: {nombre}")
            result = await refactorizar_archivo(archivo)
            estado = "OK" if result.exito else "ERROR"
            cambios = len(result.cambios_realizados)
            print(f"[Refactoring] {estado} {nombre} ({cambios} cambios)")
            return result

    tareas = [procesar_con_limite(arch) for arch in archivos]
    resultados = await asyncio.gather(*tareas)

    exitosos = [r for r in resultados if r.exito]
    fallidos = [r for r in resultados if not r.exito]
    modificados = [r for r in exitosos if r.cambios_realizados]
    total_cambios = sum(len(r.cambios_realizados) for r in exitosos)

    return {
        "total_archivos": len(archivos),
        "exitosos": len(exitosos),
        "fallidos": len(fallidos),
        "archivos_modificados": len(modificados),
        "total_cambios_aplicados": total_cambios,
        "archivos_sin_cambios": len(exitosos) - len(modificados),
        "cambios_por_archivo": [
            {"archivo": r.archivo, "cambios": r.cambios_realizados}
            for r in modificados
        ],
        "errores": [
            {"archivo": r.archivo, "error": r.error}
            for r in fallidos
        ],
    }

async def main():
    stats = await refactoring_paralelo(
        directorio="/mi/proyecto/src",
        max_concurrente=3
    )
    print(f"\n=== Resultado del Refactoring ===")
    print(f"Archivos procesados: {stats['exitosos']}/{stats['total_archivos']}")
    print(f"Archivos modificados: {stats['archivos_modificados']}")
    print(f"Total cambios: {stats['total_cambios_aplicados']}")
    if stats['fallidos'] > 0:
        print(f"Errores: {stats['fallidos']}")
        for err in stats['errores']:
            print(f"  - {err['archivo']}: {err['error']}")

asyncio.run(main())

13. Ejemplo completo: Pipeline de CI/CD

Un sistema multi-agente que coordina el pipeline completo: análisis → tests → reporte.

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict

class EstadoEtapa(Enum):
    PENDIENTE = "pendiente"
    EN_PROGRESO = "en_progreso"
    EXITOSO = "exitoso"
    FALLIDO = "fallido"
    OMITIDO = "omitido"

@dataclass
class EtapaCICD:
    nombre: str
    descripcion: str
    estado: EstadoEtapa = EstadoEtapa.PENDIENTE
    resultado: str = ""
    duracion_segundos: float = 0.0
    error: str = ""
    es_critica: bool = True  # Si falla, detiene el pipeline

async def ejecutar_analisis_estatico(directorio: str) -> EtapaCICD:
    """Análisis estático: linting, type checking, format."""
    import time
    etapa = EtapaCICD(
        nombre="analisis_estatico",
        descripcion="Análisis estático de código",
        es_critica=False  # Advertencias no detienen el pipeline
    )
    etapa.estado = EstadoEtapa.EN_PROGRESO
    inicio = time.time()

    opciones = ClaudeCodeOptions(
        model="claude-haiku-4-5",
        allowed_tools=["Bash", "Glob", "Read"],
        disallowed_tools=["Write", "Edit"],
        system_prompt="""Ejecuta análisis estático del código Python.
Verifica:
1. Sintaxis correcta (python -m py_compile)
2. Imports innecesarios o faltantes
3. Variables no utilizadas
4. Funciones demasiado largas (>50 líneas)

Retorna JSON: {"aprobado": true/false, "advertencias": [], "errores": []}
Sé estricto pero justo.""",
        max_turns=8,
        cwd=directorio,
    )

    try:
        resultado = ""
        async for m in query(
            prompt="Ejecuta análisis estático completo del proyecto Python.",
            options=opciones
        ):
            if hasattr(m, 'content'):
                for b in m.content:
                    if hasattr(b, 'text'):
                        resultado = b.text

        etapa.resultado = resultado
        etapa.estado = EstadoEtapa.EXITOSO
    except Exception as e:
        etapa.error = str(e)
        etapa.estado = EstadoEtapa.FALLIDO

    etapa.duracion_segundos = time.time() - inicio
    return etapa

async def ejecutar_tests(directorio: str) -> EtapaCICD:
    """Ejecuta la suite de tests con pytest."""
    import time
    etapa = EtapaCICD(
        nombre="tests",
        descripcion="Suite de tests unitarios e integración",
        es_critica=True  # Tests fallidos detienen el pipeline
    )
    etapa.estado = EstadoEtapa.EN_PROGRESO
    inicio = time.time()

    opciones = ClaudeCodeOptions(
        model="claude-haiku-4-5",
        allowed_tools=["Bash"],
        system_prompt="""Ejecuta los tests del proyecto.
Detecta automáticamente el framework (pytest, unittest, jest, etc.).
Ejecuta los tests y reporta resultados.
Retorna JSON:
{
  "aprobado": true/false,
  "tests_pasados": N,
  "tests_fallidos": N,
  "cobertura": "X%",
  "tests_fallidos_detalle": ["test_name: error message"]
}""",
        max_turns=5,
        cwd=directorio,
    )

    try:
        resultado = ""
        async for m in query(
            prompt="Ejecuta todos los tests y reporta los resultados en JSON.",
            options=opciones
        ):
            if hasattr(m, 'content'):
                for b in m.content:
                    if hasattr(b, 'text'):
                        resultado = b.text

        etapa.resultado = resultado
        etapa.estado = EstadoEtapa.EXITOSO
    except Exception as e:
        etapa.error = str(e)
        etapa.estado = EstadoEtapa.FALLIDO

    etapa.duracion_segundos = time.time() - inicio
    return etapa

async def ejecutar_revision_seguridad(directorio: str) -> EtapaCICD:
    """Revisión básica de seguridad."""
    import time
    etapa = EtapaCICD(
        nombre="seguridad",
        descripcion="Revisión de vulnerabilidades de seguridad",
        es_critica=False
    )
    etapa.estado = EstadoEtapa.EN_PROGRESO
    inicio = time.time()

    opciones = ClaudeCodeOptions(
        model="claude-haiku-4-5",
        allowed_tools=["Grep", "Read", "Glob"],
        disallowed_tools=["Bash", "Write", "Edit"],
        system_prompt="""Realiza una revisión básica de seguridad.
Busca:
- Secrets hardcodeados (passwords, API keys, tokens)
- SQL injection potencial
- Uso de eval() o exec()
- Dependencias con vulnerabilidades conocidas

Retorna JSON: {"aprobado": true/false, "problemas_criticos": [], "advertencias": []}""",
        max_turns=8,
        cwd=directorio,
    )

    try:
        resultado = ""
        async for m in query(
            prompt="Realiza revisión de seguridad del código fuente.",
            options=opciones
        ):
            if hasattr(m, 'content'):
                for b in m.content:
                    if hasattr(b, 'text'):
                        resultado = b.text

        etapa.resultado = resultado
        etapa.estado = EstadoEtapa.EXITOSO
    except Exception as e:
        etapa.error = str(e)
        etapa.estado = EstadoEtapa.FALLIDO

    etapa.duracion_segundos = time.time() - inicio
    return etapa

async def generar_reporte_pipeline(etapas: List[EtapaCICD], proyecto: str) -> str:
    """Genera reporte final del pipeline."""
    opciones = ClaudeCodeOptions(
        model="claude-haiku-4-5",
        allowed_tools=[],
        system_prompt="""Genera un reporte de CI/CD conciso en Markdown.
Incluye: estado general (PASS/FAIL), tabla de etapas, problemas críticos, recomendaciones.""",
        max_turns=3,
    )

    resumen_etapas = "\n".join([
        f"- {e.nombre}: {e.estado.value} ({e.duracion_segundos:.1f}s)\n  {e.resultado[:200] if e.resultado else e.error}"
        for e in etapas
    ])

    resultado = ""
    async for m in query(
        prompt=f"Genera reporte de CI/CD para '{proyecto}':\n\n{resumen_etapas}",
        options=opciones
    ):
        if hasattr(m, 'content'):
            for b in m.content:
                if hasattr(b, 'text'):
                    resultado = b.text

    return resultado

async def pipeline_cicd_completo(directorio: str, proyecto: str = "mi-proyecto") -> dict:
    """
    Pipeline CI/CD multi-agente completo.
    Análisis y seguridad en paralelo, luego tests, luego reporte.
    """
    import time
    print(f"[CI/CD] Iniciando pipeline para: {proyecto}")
    inicio_total = time.time()
    etapas: List[EtapaCICD] = []

    # FASE 1: Análisis y seguridad en paralelo (no bloquean el build)
    print("[CI/CD] Fase 1: Análisis paralelo...")
    analisis, seguridad = await asyncio.gather(
        ejecutar_analisis_estatico(directorio),
        ejecutar_revision_seguridad(directorio)
    )
    etapas.extend([analisis, seguridad])

    # FASE 2: Tests (críticos — si fallan, se reporta pero no bloquea)
    print("[CI/CD] Fase 2: Tests...")
    tests = await ejecutar_tests(directorio)
    etapas.append(tests)

    # FASE 3: Reporte final
    print("[CI/CD] Fase 3: Generando reporte...")
    reporte = await generar_reporte_pipeline(etapas, proyecto)

    duracion_total = time.time() - inicio_total
    estado_global = all(
        e.estado == EstadoEtapa.EXITOSO
        for e in etapas
        if e.es_critica
    )

    print(f"[CI/CD] Pipeline completado en {duracion_total:.1f}s — {'PASS' if estado_global else 'FAIL'}")

    return {
        "estado": "PASS" if estado_global else "FAIL",
        "duracion_total": round(duracion_total, 1),
        "etapas": [
            {
                "nombre": e.nombre,
                "estado": e.estado.value,
                "duracion": round(e.duracion_segundos, 1),
                "critica": e.es_critica
            }
            for e in etapas
        ],
        "reporte": reporte,
    }

asyncio.run(pipeline_cicd_completo("/mi/proyecto", "mi-api"))

14. Testing de sistemas multi-agente

Mocking de subagentes en Python

import pytest
import asyncio
from unittest.mock import AsyncMock, patch, MagicMock

def crear_mock_query(respuesta: str):
    """Crea un mock del generador async de query()."""
    class MensajeMock:
        def __init__(self, texto):
            self.content = [MagicMock(text=texto)]

    async def _generador(*args, **kwargs):
        yield MensajeMock(respuesta)

    return _generador

@pytest.mark.asyncio
async def test_worker_buscador_parsea_json_correctamente():
    """Verifica que el worker parsea JSON de hallazgos."""
    respuesta_mock = json.dumps([{
        "titulo": "Configuración de auth",
        "contenido": "Usa JWT con RS256",
        "fuente": "auth/config.py",
        "relevancia": 9
    }])

    with patch('claude_code_sdk.query', crear_mock_query(respuesta_mock)):
        hallazgos = await worker_buscador("autenticación", "/proyecto")

    assert len(hallazgos) == 1
    assert hallazgos[0].titulo == "Configuración de auth"
    assert hallazgos[0].relevancia == 9

@pytest.mark.asyncio
async def test_worker_buscador_maneja_json_invalido():
    """Verifica fallback cuando el JSON está malformado."""
    with patch('claude_code_sdk.query', crear_mock_query("No hay JSON aquí, solo texto")):
        hallazgos = await worker_buscador("tema", "/proyecto")

    # Debe retornar al menos un hallazgo fallback
    assert len(hallazgos) >= 1

@pytest.mark.asyncio
async def test_coordinador_continua_si_analisis_falla():
    """Verifica que el sistema produce reporte aunque el análisis falle."""
    call_count = 0

    def query_que_falla_en_analisis(*args, **kwargs):
        nonlocal call_count
        call_count += 1

        if call_count == 2:  # Segunda llamada = análisis
            async def _gen():
                raise Exception("Error de análisis simulado")
                yield  # Hace al generador válido
            return _gen()
        else:
            return crear_mock_query("Resultado genérico")(*args, **kwargs)

    with patch('claude_code_sdk.query', query_que_falla_en_analisis):
        investigacion = await sistema_investigacion("tema", "/proyecto")

    assert investigacion.reporte != ""  # Debe tener reporte aunque sea de fallback
    assert len(investigacion.errores) > 0  # Debe registrar el error

@pytest.mark.asyncio
async def test_fan_out_respeta_limite_concurrencia():
    """Verifica que el fan-out no supera la concurrencia máxima."""
    concurrentes_actuales = 0
    max_concurrentes = 0

    original_analizar = analizar_archivo

    async def analizar_con_contador(ruta: str):
        nonlocal concurrentes_actuales, max_concurrentes
        concurrentes_actuales += 1
        max_concurrentes = max(max_concurrentes, concurrentes_actuales)
        await asyncio.sleep(0.01)  # Simular trabajo
        concurrentes_actuales -= 1
        return {"archivo": ruta, "problemas": [], "puntuacion": 8, "resumen": "OK"}

    archivos = [f"archivo_{i}.py" for i in range(10)]

    with patch('__main__.analizar_archivo', analizar_con_contador):
        await fan_out_controlado(archivos, analizar_con_contador, max_concurrente=3)

    assert max_concurrentes <= 3, f"Se excedió la concurrencia: {max_concurrentes}"

TypeScript: Testing con Jest y mocks

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

jest.mock("@anthropic-ai/claude-code-sdk");
const mockQuery = query as jest.MockedFunction<typeof query>;

function crearMockQuery(respuesta: string) {
  return async function* () {
    yield {
      type: "assistant" as const,
      message: {
        content: [{ type: "text" as const, text: respuesta }],
      },
    };
  };
}

describe("Sistema multi-agente", () => {
  beforeEach(() => jest.clearAllMocks());

  describe("workerInvestigacion", () => {
    it("retorna resultado exitoso cuando el query funciona", async () => {
      mockQuery.mockImplementation(crearMockQuery("Estructura del proyecto") as any);

      const resultado = await workerInvestigacion("/proyecto");

      expect(resultado.success).toBe(true);
      expect(resultado.result).toBe("Estructura del proyecto");
      expect(mockQuery).toHaveBeenCalledTimes(1);
    });

    it("retorna error cuando el query falla", async () => {
      mockQuery.mockImplementation(async function* () {
        throw new Error("API Error");
      } as any);

      const resultado = await workerInvestigacion("/proyecto");

      expect(resultado.success).toBe(false);
      expect(resultado.error).toContain("API Error");
    });

    it("usa las opciones correctas para el modelo liviano", async () => {
      mockQuery.mockImplementation(crearMockQuery("resultado") as any);

      await workerInvestigacion("/proyecto");

      const llamadaOpciones = (mockQuery.mock.calls[0][0] as any).options;
      expect(llamadaOpciones.model).toBe("claude-haiku-4-5");
      expect(llamadaOpciones.disallowedTools).toContain("Bash");
      expect(llamadaOpciones.disallowedTools).toContain("Write");
    });
  });

  describe("coordinadorHub", () => {
    it("completa el flujo completo cuando todos los workers funcionan", async () => {
      let llamadas = 0;
      mockQuery.mockImplementation(async function* () {
        llamadas++;
        yield {
          type: "assistant" as const,
          message: { content: [{ type: "text" as const, text: `Resultado ${llamadas}` }] },
        };
      } as any);

      const resultado = await coordinadorHub("/proyecto");

      expect(resultado).toBeTruthy();
      expect(llamadas).toBe(3); // investigación + análisis + redacción
    });
  });
});

15. Anti-patrones a evitar

Anti-patrón 1: Demasiados niveles de anidación

# MAL: 4 niveles — imposible de depurar y muy caro
async def orquestador_nivel_1():
    async def orquestador_nivel_2():
        async def orquestador_nivel_3():
            async def worker_nivel_4():
                pass  # El costo se multiplica exponencialmente

# BIEN: máximo 2-3 niveles, workers directos desde el coordinador
async def orquestador():
    resultados = await asyncio.gather(
        worker_a(),
        worker_b(),
        worker_c()
    )

Anti-patrón 2: Workers sin límite de herramientas

# MAL: worker con acceso total al sistema sin justificación
opciones_inseguras = ClaudeCodeOptions(
    # Sin restricciones de herramientas
    max_turns=100,  # Sin límite práctico — puede ejecutarse por horas
)

# BIEN: principio de mínimo privilegio
opciones_seguras = ClaudeCodeOptions(
    allowed_tools=["Read", "Glob"],  # Solo lo estrictamente necesario
    disallowed_tools=["Bash", "Write", "Edit"],  # Bloquear explícitamente lo peligroso
    max_turns=10,  # Límite razonable para la tarea
)

Anti-patrón 3: Ignorar errores de workers

# MAL: falla silenciosa — produce resultados incorrectos sin saberlo
async def orquestador_malo():
    resultado_1 = await worker_1()
    resultado_2 = await worker_2()
    return consolidar(resultado_1, resultado_2)
    # Si cualquier worker retorna None o string vacío, consolidar falla silenciosamente

# BIEN: manejo explícito con decisiones claras
async def orquestador_bueno():
    resultado_1 = await worker_1()
    if not resultado_1.exito:
        if resultado_1.es_critico:
            raise RuntimeError(f"Worker crítico falló: {resultado_1.error}")
        else:
            print(f"Advertencia: worker no crítico falló: {resultado_1.error}")
            resultado_1.valor = "No disponible"

    resultado_2 = await worker_2()
    return consolidar(resultado_1.valor, resultado_2.valor if resultado_2.exito else "N/A")

Anti-patrón 4: System prompt demasiado genérico

# MAL: system prompt genérico que no especializa al worker
opciones_genericas = ClaudeCodeOptions(
    system_prompt="Eres un asistente de código útil."
    # El worker no sabe qué hacer, en qué formato retornar, ni qué NO hacer
)

# BIEN: system prompt específico con formato de output, rol claro y restricciones
opciones_especializadas = ClaudeCodeOptions(
    system_prompt="""Eres un especialista en análisis de seguridad OWASP.
Tu ÚNICO objetivo: detectar vulnerabilidades OWASP Top 10 en código Python.
Formato de output OBLIGATORIO:
{"vulnerabilidades": [{"tipo": "A01|A02|...", "archivo": "...", "linea": N, "descripcion": "...", "severidad": "alta|media|baja"}]}
Si no hay vulnerabilidades: {"vulnerabilidades": []}
NO hagas recomendaciones fuera del JSON.
NO modifiques ningún archivo.
SOLO lee y analiza."""
)

Anti-patrón 5: Deadlocks por dependencias circulares

graph LR
    A[Worker A espera resultado de B] -->|bloquea| B[Worker B espera resultado de A]
    B -->|bloquea| A
    style A fill:#ff6b6b,color:#fff
    style B fill:#ff6b6b,color:#fff
# MAL: dependencias circulares — deadlock garantizado
async def worker_a(estado):
    resultado_b = await esperar_resultado(estado, "worker_b")  # Espera B
    return procesar(resultado_b)

async def worker_b(estado):
    resultado_a = await esperar_resultado(estado, "worker_a")  # Espera A — DEADLOCK
    return procesar(resultado_a)

# BIEN: dependencias unidireccionales
async def coordinador_sin_deadlock():
    resultado_a = await worker_a()          # A ejecuta primero
    resultado_b = await worker_b(resultado_a)  # B usa resultado de A
    return combinar(resultado_a, resultado_b)

Anti-patrón 6: No controlar la concurrencia

# MAL: lanza todos los workers a la vez sin límite
archivos = obtener_1000_archivos()
tareas = [analizar_archivo(f) for f in archivos]
resultados = await asyncio.gather(*tareas)
# Lanza 1000 llamadas a la API simultáneamente — rate limit inmediato

# BIEN: controlar concurrencia con semáforo
semaforo = asyncio.Semaphore(5)  # Máximo 5 concurrentes

async def analizar_con_limite(archivo):
    async with semaforo:
        return await analizar_archivo(archivo)

tareas = [analizar_con_limite(f) for f in archivos]
resultados = await asyncio.gather(*tareas)

Anti-patrón 7: Pasar contexto innecesario a los workers

# MAL: pasar todo el código fuente a un worker que solo necesita nombres de archivo
await worker_analisis(
    contexto=todo_el_codigo_fuente_del_proyecto,  # 100K tokens innecesarios
    tarea="encuentra archivos Python"
)

# BIEN: pasar solo el contexto necesario para la tarea
archivos = await worker_explorador()  # Primero obtener la lista
await worker_analisis(
    contexto=archivos,  # Solo la lista de rutas, no el contenido
    tarea="analiza estos archivos"
)

Resumen del capítulo

Los sistemas multi-agente permiten escalar la capacidad de resolución de problemas al dividir trabajo complejo entre agentes especializados.

mindmap
  root((Subagentes y Orquestación))
    Patrones
      Hub-and-Spoke
        Workers especializados
        Síntesis central
      Pipeline
        Transformación secuencial
        Cada etapa transforma la anterior
      Fan-out Fan-in
        Máximo paralelismo
        Consolidación de resultados
      Jerárquico
        Múltiples niveles de coordinación
    Configuración
      allowed_tools mínimo privilegio
      max_turns siempre definir
      system_prompt específico y con formato
      model correcto para cada tarea
    Comunicación
      Estado compartido en memoria
      Output como input
      Sin comunicación directa entre workers
    Resiliencia
      Reintentos con backoff exponencial
      Circuit breaker
      Fallbacks para errores no críticos
      Timeouts en todas las llamadas
    Testing
      Mock de la función query
      Tests unitarios por worker
      Tests de integración del orquestador
      Verificar manejo de errores
    Anti-patrones
      Más de 3 niveles de anidación
      Sin límites de herramientas
      Ignorar errores de workers
      Deadlocks circulares
      Sin control de concurrencia

Próximo capítulo: Manejo de sesiones y contexto persistente entre múltiples queries.