Capítulo 19: Patrones Multi-Agente Avanzados

Por: Artiko
claudeagent-sdkmulti-agentepatronesorquestacion

Capítulo 19: Patrones Multi-Agente Avanzados

Los sistemas multi-agente simples (hub-and-spoke, pipeline, paralelo) tienen límites bien definidos. Para problemas que requieren razonamiento profundo, validación rigurosa o especialización extrema, necesitamos patrones más sofisticados. Este capítulo cubre los patrones avanzados que emergen cuando múltiples agentes colaboran de formas no triviales.


1. Más Allá del Hub-and-Spoke

Limitaciones de los Patrones Básicos

Los patrones del capítulo anterior — orquestador central con subagentes — tienen limitaciones importantes:

flowchart TB
    subgraph Basico["Patrón Básico (Hub-and-Spoke)"]
        O["Orquestador"] --> A1["Agente 1"]
        O --> A2["Agente 2"]
        O --> A3["Agente 3"]
    end

    subgraph Limitaciones["Limitaciones"]
        L1["Sin verificación\nindependiente"]
        L2["Sin mejora\niterativa"]
        L3["Sin perspectivas\ncontradictorias"]
        L4["Sin especialización\nreal"]
    end

    Basico -.-> Limitaciones

¿Cuándo necesitas patrones más complejos?

SituaciónPatrón recomendado
El código generado debe ser correcto, no solo plausibleReflection o Verificación cruzada
Decisiones de arquitectura con trade-offs importantesDebate
Múltiples enfoques posibles, necesitas el mejorEnsemble con voting
El agente comete errores sistemáticosSelf-improvement loop
Tareas muy distintas requieren expertise especializadoRouting con especialistas
Agentes en paralelo sin coordinador explícitoComunicación por artefactos
El humano debe aprobar decisiones críticasSupervisión humana escalable
Los agentes deben recordar conocimiento acumuladoMemoria compartida con vector DB

Taxonomía de Patrones Multi-Agente

flowchart TB
    Patrones["Patrones Multi-Agente"] --> Iterativos["Iterativos\n(mejora en ciclos)"]
    Patrones --> Colaborativos["Colaborativos\n(múltiples perspectivas)"]
    Patrones --> Especializados["Especializados\n(routing por capacidad)"]
    Patrones --> Distribuidos["Distribuidos\n(sin coordinador central)"]
    Patrones --> Adaptativos["Adaptativos\n(aprenden de errores)"]

    Iterativos --> Reflection["Reflection"]
    Iterativos --> SelfImprovement["Self-Improvement"]

    Colaborativos --> Debate["Debate"]
    Colaborativos --> Ensemble["Ensemble + Voting"]
    Colaborativos --> CrossVerification["Verificación Cruzada"]

    Especializados --> Router["Router + Especialistas"]
    Distribuidos --> ArtifactComm["Comunicación por Artefactos"]
    Adaptativos --> SharedMemory["Memoria Compartida"]

2. Patrón Reflection

Concepto: Generar → Criticar → Mejorar

El patrón Reflection es la forma más poderosa de mejorar la calidad de outputs cuando la calidad importa más que la velocidad. Un agente genera, otro critica con rigor, y el primero mejora basado en la crítica.

sequenceDiagram
    participant User
    participant Generator as Agente Generador
    participant Critic as Agente Crítico
    participant Judge as Juez de Calidad

    User->>Generator: Tarea inicial

    loop Ciclo de Reflexión (N iteraciones)
        Generator->>Generator: Genera solución v(N)
        Generator->>Critic: Entrega solución v(N)
        Critic->>Critic: Analiza fallas, inconsistencias
        Critic->>Generator: Critica detallada
        Generator->>Generator: Mejora basada en crítica
        Generator->>Judge: Solución mejorada v(N+1)
        Judge->>Judge: ¿Calidad suficiente?
        Judge-->>Generator: No suficiente → continuar
        Judge-->>User: Suficiente → retornar resultado
    end

Implementación Python

# reflection_pattern.py
import asyncio
from dataclasses import dataclass
from typing import AsyncGenerator
from claude_code_sdk import query, ClaudeCodeOptions


@dataclass
class ReflectionConfig:
    max_iterations: int = 3
    quality_threshold: float = 0.8
    generator_tools: list[str] = None
    critic_tools: list[str] = None

    def __post_init__(self):
        if self.generator_tools is None:
            self.generator_tools = ["Read", "Edit", "Bash"]
        if self.critic_tools is None:
            self.critic_tools = ["Read", "Bash"]


async def _run_agent(prompt: str, tools: list[str], max_turns: int = 30) -> str:
    """Ejecuta un agente y retorna el resultado como string."""
    results = []
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(
            allowed_tools=tools,
            max_turns=max_turns
        )
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))
    return "\n".join(results)


async def reflection_pattern(
    task: str,
    config: ReflectionConfig | None = None
) -> str:
    """
    Patrón Reflection: genera, critica y mejora iterativamente.

    Args:
        task: Descripción de la tarea a resolver
        config: Configuración del ciclo de reflexión

    Returns:
        Solución mejorada tras N iteraciones de reflexión
    """
    if config is None:
        config = ReflectionConfig()

    print(f"[Reflection] Iniciando con tarea: {task[:80]}...")

    # Paso 1: Generación inicial
    generator_prompt = f"""
Tarea: {task}

Produce una solución completa y detallada. No omitas pasos importantes.
Piensa paso a paso antes de escribir la solución.
"""
    current_solution = await _run_agent(
        generator_prompt,
        config.generator_tools
    )
    print(f"[Reflection] Solución inicial generada ({len(current_solution)} chars)")

    # Ciclo de reflexión
    for iteration in range(config.max_iterations):
        print(f"[Reflection] Iteración {iteration + 1}/{config.max_iterations}")

        # Paso 2: Crítica
        critic_prompt = f"""
Tarea original: {task}

Solución propuesta:
{current_solution}

Tu rol es ser un CRÍTICO RIGUROSO. Analiza la solución con estos criterios:
1. Correctitud: ¿Es técnicamente correcta? ¿Hay bugs o errores lógicos?
2. Completitud: ¿Cubre todos los casos de la tarea? ¿Hay edge cases faltantes?
3. Calidad: ¿Sigue buenas prácticas? ¿Es mantenible y legible?
4. Seguridad: ¿Hay vulnerabilidades o problemas de seguridad?
5. Performance: ¿Hay problemas de rendimiento obvios?

Para CADA problema encontrado:
- Describe el problema específicamente (no en general)
- Indica la línea o sección afectada
- Sugiere cómo corregirlo

Si la solución es excelente, di "APROBADO" claramente.
Si hay problemas menores, di "APROBADO CON MEJORAS" y lista las mejoras.
Si hay problemas serios, di "RECHAZADO" y explica qué debe cambiar.
"""
        critique = await _run_agent(
            critic_prompt,
            config.critic_tools,
            max_turns=15
        )
        print(f"[Reflection] Crítica: {critique[:200]}...")

        # Verificar si la calidad es suficiente
        is_approved = (
            "APROBADO" in critique.upper() and
            "RECHAZADO" not in critique.upper()
        )

        if is_approved:
            print(f"[Reflection] Aprobado en iteración {iteration + 1}")
            break

        # Paso 3: Mejora basada en crítica
        improve_prompt = f"""
Tarea original: {task}

Tu solución anterior:
{current_solution}

Crítica recibida:
{critique}

Mejora tu solución atendiendo TODOS los puntos de la crítica.
Mantén los aspectos que el crítico consideró correctos.
Para cada punto criticado, aplica la corrección sugerida.
Produce la versión mejorada completa (no solo los cambios).
"""
        current_solution = await _run_agent(
            improve_prompt,
            config.generator_tools
        )
        print(f"[Reflection] Solución mejorada ({len(current_solution)} chars)")

    return current_solution


# ============================================================
# Ejemplo: Reflection para código crítico
# ============================================================

async def main():
    task = """
    Escribe una función Python que parsee y valide JWTs.
    Debe verificar la firma, expiración, y claims obligatorios.
    Debe manejar todos los casos de error correctamente.
    """

    config = ReflectionConfig(
        max_iterations=3,
        generator_tools=["Bash"],  # Solo Bash para generar código
        critic_tools=["Bash"]
    )

    result = await reflection_pattern(task, config)
    print("\n=== RESULTADO FINAL ===")
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

Implementación TypeScript

// reflection-pattern.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

interface ReflectionConfig {
  maxIterations: number;
  generatorTools: string[];
  criticTools: string[];
}

const DEFAULT_CONFIG: ReflectionConfig = {
  maxIterations: 3,
  generatorTools: ["Read", "Edit", "Bash"],
  criticTools: ["Read", "Bash"],
};

async function runAgent(
  prompt: string,
  tools: string[],
  maxTurns = 30
): Promise<string> {
  const results: string[] = [];
  for await (const message of query(prompt, {
    allowedTools: tools,
    maxTurns,
  } as ClaudeCodeOptions)) {
    if ("content" in message && message.content) {
      results.push(String(message.content));
    }
  }
  return results.join("\n");
}

async function reflectionPattern(
  task: string,
  config: ReflectionConfig = DEFAULT_CONFIG
): Promise<string> {
  console.log(`[Reflection] Tarea: ${task.slice(0, 80)}...`);

  // Generación inicial
  let currentSolution = await runAgent(
    `Tarea: ${task}\n\nProduce una solución completa y detallada.`,
    config.generatorTools
  );

  for (let i = 0; i < config.maxIterations; i++) {
    console.log(`[Reflection] Iteración ${i + 1}/${config.maxIterations}`);

    // Crítica
    const critique = await runAgent(
      `Tarea: ${task}\n\nSolución propuesta:\n${currentSolution}\n\n` +
        `Analiza críticamente. Marca APROBADO o RECHAZADO.`,
      config.criticTools,
      15
    );

    const approved =
      critique.toUpperCase().includes("APROBADO") &&
      !critique.toUpperCase().includes("RECHAZADO");

    if (approved) {
      console.log(`[Reflection] Aprobado en iteración ${i + 1}`);
      break;
    }

    // Mejora
    currentSolution = await runAgent(
      `Tarea: ${task}\n\nSolución anterior:\n${currentSolution}\n\n` +
        `Crítica:\n${critique}\n\nMejora la solución atendiendo todos los puntos.`,
      config.generatorTools
    );
  }

  return currentSolution;
}

export { reflectionPattern, ReflectionConfig };

3. Patrón Debate

Múltiples Agentes Argumentando Posiciones

El patrón Debate es ideal para decisiones que tienen trade-offs reales donde no hay una respuesta objetivamente correcta: elección de tecnología, decisiones de arquitectura, estrategias de producto.

flowchart TD
    Question["Pregunta / Decisión"] --> Pro["Agente PRO\n(defiende opción A)"]
    Question --> Con["Agente CONTRA\n(defiende opción B)"]

    Pro --> |"Argumento 1"| Round1["Ronda 1"]
    Con --> |"Contraargumento 1"| Round1

    Round1 --> |"Rebatir"| Pro
    Round1 --> |"Rebatir"| Con

    Pro --> |"Argumento 2"| Round2["Ronda 2"]
    Con --> |"Contraargumento 2"| Round2

    Round2 --> Arbitro["Árbitro\n(analiza ambas posiciones)"]
    Arbitro --> Decision["Decisión final\ncon justificación"]

Implementación Python

# debate_pattern.py
import asyncio
from dataclasses import dataclass
from claude_code_sdk import query, ClaudeCodeOptions


@dataclass
class DebateConfig:
    rounds: int = 2
    tools: list[str] = None

    def __post_init__(self):
        if self.tools is None:
            self.tools = ["Bash"]  # Solo lectura/análisis, no modificar archivos


async def _agent_respond(prompt: str, tools: list[str]) -> str:
    """Respuesta de un agente."""
    results = []
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=10)
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))
    return "\n".join(results)


async def debate_pattern(
    question: str,
    option_a: str,
    option_b: str,
    config: DebateConfig | None = None
) -> dict:
    """
    Debate entre dos posiciones con árbitro final.

    Returns:
        dict con 'winner', 'reasoning', 'transcript'
    """
    if config is None:
        config = DebateConfig()

    transcript = []

    # Posiciones iniciales
    pro_prompt = f"""
Eres el DEFENSOR de la opción: "{option_a}".
Debes argumentar a favor de esta opción frente a "{option_b}".
La pregunta es: {question}

Presenta tus 3 mejores argumentos. Sé específico con datos y ejemplos reales.
No seas genérico. Conoce las debilidades de tu posición y abórdalas proactivamente.
"""
    con_prompt = f"""
Eres el DEFENSOR de la opción: "{option_b}".
Debes argumentar a favor de esta opción frente a "{option_a}".
La pregunta es: {question}

Presenta tus 3 mejores argumentos. Sé específico con datos y ejemplos reales.
No seas genérico. Conoce las debilidades de tu posición y abórdalas proactivamente.
"""

    pro_position = await _agent_respond(pro_prompt, config.tools)
    con_position = await _agent_respond(con_prompt, config.tools)
    transcript.append({"round": 0, "pro": pro_position, "con": con_position})

    # Rondas de debate
    for round_num in range(1, config.rounds + 1):
        pro_rebut_prompt = f"""
Contexto: Debate sobre "{question}"
Tu posición: {option_a}
Tu argumento anterior: {transcript[-1]['pro']}

El oponente respondió:
{transcript[-1]['con']}

REBATE sus argumentos. Para cada punto del oponente:
1. Reconoce si tiene algo de razón (honestidad intelectual)
2. Explica por qué tu posición sigue siendo superior
3. Añade un argumento nuevo que el oponente no consideró
"""
        con_rebut_prompt = f"""
Contexto: Debate sobre "{question}"
Tu posición: {option_b}
Tu argumento anterior: {transcript[-1]['con']}

El oponente respondió:
{transcript[-1]['pro']}

REBATE sus argumentos. Para cada punto del oponente:
1. Reconoce si tiene algo de razón (honestidad intelectual)
2. Explica por qué tu posición sigue siendo superior
3. Añade un argumento nuevo que el oponente no consideró
"""
        pro_rebuttal = await _agent_respond(pro_rebut_prompt, config.tools)
        con_rebuttal = await _agent_respond(con_rebut_prompt, config.tools)
        transcript.append({
            "round": round_num,
            "pro": pro_rebuttal,
            "con": con_rebuttal
        })

    # Árbitro
    debate_summary = "\n\n".join([
        f"=== Ronda {t['round']} ===\n"
        f"PRO ({option_a}):\n{t['pro']}\n\n"
        f"CONTRA ({option_b}):\n{t['con']}"
        for t in transcript
    ])

    arbitro_prompt = f"""
Eres un ÁRBITRO imparcial. Has presenciado el siguiente debate:

Pregunta: {question}

{debate_summary}

Tu tarea:
1. Evalúa la calidad de los argumentos de cada lado (no tu preferencia personal)
2. Identifica qué argumentos fueron más sólidos y por qué
3. Determina qué opción ganó el debate basándote en la calidad argumentativa
4. Proporciona una recomendación final con justificación de 3-5 oraciones

Formato de respuesta:
GANADOR: [nombre de la opción]
JUSTIFICACIÓN: [3-5 oraciones explicando el razonamiento]
ARGUMENTOS CLAVE: [los 2-3 argumentos más decisivos]
ADVERTENCIAS: [condiciones bajo las cuales la decisión podría cambiar]
"""
    arbitro_result = await _agent_respond(arbitro_prompt, config.tools)

    return {
        "question": question,
        "option_a": option_a,
        "option_b": option_b,
        "arbitro_decision": arbitro_result,
        "transcript": transcript
    }


# Ejemplo: Debate de arquitectura
async def main():
    result = await debate_pattern(
        question="¿Qué base de datos usar para el sistema de sesiones de agentes?",
        option_a="PostgreSQL con tabla de sesiones",
        option_b="Redis con TTL automático",
        config=DebateConfig(rounds=2, tools=["Bash"])
    )

    print("=== DECISIÓN DEL ÁRBITRO ===")
    print(result["arbitro_decision"])

if __name__ == "__main__":
    asyncio.run(main())

Debate en TypeScript

// debate-pattern.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

interface DebateRound {
  round: number;
  pro: string;
  con: string;
}

interface DebateResult {
  question: string;
  optionA: string;
  optionB: string;
  arbiterDecision: string;
  transcript: DebateRound[];
}

async function runDebateAgent(prompt: string): Promise<string> {
  const results: string[] = [];
  for await (const message of query(prompt, {
    allowedTools: ["Bash"],
    maxTurns: 10,
  } as ClaudeCodeOptions)) {
    if ("content" in message && message.content) {
      results.push(String(message.content));
    }
  }
  return results.join("\n");
}

async function debatePattern(
  question: string,
  optionA: string,
  optionB: string,
  rounds = 2
): Promise<DebateResult> {
  const transcript: DebateRound[] = [];

  // Ronda inicial
  const proArg = await runDebateAgent(
    `Defiende "${optionA}" vs "${optionB}" para: ${question}. 3 argumentos específicos.`
  );
  const conArg = await runDebateAgent(
    `Defiende "${optionB}" vs "${optionA}" para: ${question}. 3 argumentos específicos.`
  );
  transcript.push({ round: 0, pro: proArg, con: conArg });

  // Rondas de rebatimiento
  for (let r = 1; r <= rounds; r++) {
    const prev = transcript[transcript.length - 1];
    const [proRebut, conRebut] = await Promise.all([
      runDebateAgent(
        `Tu posición: ${optionA}. Rebate: "${prev.con}". Añade argumento nuevo.`
      ),
      runDebateAgent(
        `Tu posición: ${optionB}. Rebate: "${prev.pro}". Añade argumento nuevo.`
      ),
    ]);
    transcript.push({ round: r, pro: proRebut, con: conRebut });
  }

  // Árbitro
  const summary = transcript
    .map(
      (t) =>
        `Ronda ${t.round}: PRO: ${t.pro.slice(0, 300)}... CON: ${t.con.slice(0, 300)}...`
    )
    .join("\n\n");

  const arbiterDecision = await runDebateAgent(
    `Como árbitro imparcial, evalúa este debate sobre "${question}":\n${summary}\n\n` +
      `Indica GANADOR, JUSTIFICACIÓN y ARGUMENTOS CLAVE.`
  );

  return { question, optionA, optionB, arbiterDecision, transcript };
}

export { debatePattern, DebateResult };

4. Patrón Ensemble

Múltiples Agentes Resolviendo el Mismo Problema

El Ensemble es útil cuando un solo agente puede tomar caminos incorrectos, pero múltiples agentes con los mismos recursos tienen baja probabilidad de cometer el mismo error.

flowchart TD
    Task["Tarea"] --> |fork| A1["Agente 1\nSolución A"]
    Task --> |fork| A2["Agente 2\nSolución B"]
    Task --> |fork| A3["Agente 3\nSolución C"]

    A1 --> Aggregator["Agregador"]
    A2 --> Aggregator
    A3 --> Aggregator

    Aggregator --> Vote["Voting\n(mayoría decide)"]
    Aggregator --> Combine["Combination\n(mejores partes)"]

    Vote --> FinalResult["Resultado Final"]
    Combine --> FinalResult

Implementación Python

# ensemble_pattern.py
import asyncio
from collections import Counter
from claude_code_sdk import query, ClaudeCodeOptions


async def _solve_independently(
    task: str,
    agent_id: int,
    tools: list[str]
) -> tuple[int, str]:
    """Un agente del ensemble resuelve el problema."""
    prompt = f"""
Tarea: {task}

Eres el Agente {agent_id} de un ensemble. Resuelve de forma independiente.
No asumas que otros agentes están resolviendo esto — da tu mejor solución.
"""
    results = []
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))
    return agent_id, "\n".join(results)


async def _vote_on_solutions(
    task: str,
    solutions: list[tuple[int, str]],
    tools: list[str]
) -> str:
    """Agente árbitro que elige la mejor solución."""
    solutions_text = "\n\n".join([
        f"=== Solución del Agente {agent_id} ===\n{solution}"
        for agent_id, solution in solutions
    ])

    arbitro_prompt = f"""
Tarea original: {task}

Múltiples agentes propusieron estas soluciones:
{solutions_text}

Evalúa cada solución con estos criterios:
1. Correctitud técnica (40%)
2. Completitud (30%)
3. Calidad del código/explicación (20%)
4. Eficiencia (10%)

Selecciona LA MEJOR solución o, si es posible, combina las partes mejores de cada una.
Explica brevemente por qué seleccionaste/combinaste así.

Entrega la solución final completa.
"""
    results = []
    async for message in query(
        prompt=arbitro_prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=20)
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))
    return "\n".join(results)


async def ensemble_pattern(
    task: str,
    num_agents: int = 3,
    tools: list[str] | None = None
) -> str:
    """
    Ensemble: múltiples agentes resuelven en paralelo, árbitro elige la mejor.

    Args:
        task: Tarea a resolver
        num_agents: Número de agentes en el ensemble
        tools: Herramientas disponibles para los agentes

    Returns:
        La mejor solución del ensemble
    """
    if tools is None:
        tools = ["Bash"]

    print(f"[Ensemble] Lanzando {num_agents} agentes en paralelo...")

    # Ejecutar todos los agentes en paralelo
    coroutines = [
        _solve_independently(task, i + 1, tools)
        for i in range(num_agents)
    ]
    solutions = await asyncio.gather(*coroutines)

    print(f"[Ensemble] {len(solutions)} soluciones recibidas, votando...")

    # Agregar y seleccionar la mejor
    best = await _vote_on_solutions(task, list(solutions), tools)

    print(f"[Ensemble] Solución final seleccionada ({len(best)} chars)")
    return best


# Ensemble con voting simple para respuestas cortas
async def ensemble_voting(
    question: str,
    num_agents: int = 5
) -> str:
    """
    Ensemble con voting por mayoría para respuestas cortas (clasificación, etc.)
    """
    async def single_answer(agent_id: int) -> str:
        results = []
        async for message in query(
            prompt=f"{question}\n\nResponde en UNA sola línea con solo la respuesta, sin explicación.",
            options=ClaudeCodeOptions(allowed_tools=[], max_turns=3)
        ):
            if hasattr(message, 'content') and message.content:
                results.append(str(message.content))
        return "\n".join(results).strip()

    answers = await asyncio.gather(*[
        single_answer(i) for i in range(num_agents)
    ])

    # Voting por mayoría
    vote_count = Counter(
        answer.lower().strip() for answer in answers if answer
    )
    winner, count = vote_count.most_common(1)[0]

    print(f"[Ensemble Voting] Votos: {dict(vote_count)}")
    print(f"[Ensemble Voting] Ganador: '{winner}' con {count}/{num_agents} votos")

    return winner

5. Verificación Cruzada

Agente Generador + Agente Verificador

La verificación cruzada es fundamental cuando el costo de un error es alto: código de producción, migraciones de datos, configuración de seguridad.

flowchart LR
    Task["Tarea"] --> Generator["Agente Generador\n(produce código/output)"]
    Generator --> Output["Output v1"]
    Output --> Verifier["Agente Verificador\n(ejecuta tests, revisa)"]

    Verifier --> |"✓ Pasa verificación"| Result["Resultado Final"]
    Verifier --> |"✗ Falla verificación\ncon detalles"| Generator

    style Result fill:#0a0,color:#fff
    style Verifier fill:#55f,color:#fff

Implementación Python

# cross_verification.py
import asyncio
from dataclasses import dataclass
from claude_code_sdk import query, ClaudeCodeOptions


@dataclass
class VerificationResult:
    passed: bool
    issues: list[str]
    suggestions: list[str]
    confidence: float  # 0.0 a 1.0


async def _generate(task: str, feedback: str | None = None) -> str:
    """Agente generador."""
    prompt = task if not feedback else f"""
Tarea original: {task}

Tu output anterior fue rechazado con estas observaciones:
{feedback}

Genera una nueva versión corrigiendo TODOS los problemas señalados.
"""
    results = []
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(
            allowed_tools=["Read", "Edit", "Bash"],
            max_turns=30
        )
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))
    return "\n".join(results)


async def _verify(task: str, output: str, verification_criteria: list[str]) -> VerificationResult:
    """Agente verificador: ejecuta tests y revisa el output."""
    criteria_text = "\n".join(f"- {c}" for c in verification_criteria)

    verifier_prompt = f"""
Eres un VERIFICADOR riguroso. Tu trabajo es encontrar problemas, no aprobar fácilmente.

Tarea original: {task}

Output a verificar:
{output}

Criterios de verificación:
{criteria_text}

Para cada criterio:
1. Verifica explícitamente (ejecuta código, lee archivos, etc.)
2. Documenta el resultado: PASA o FALLA con detalles específicos

Si hay código, EJECUTA los tests y reporta los resultados reales.

Responde en este formato exacto:
RESULTADO_GLOBAL: APROBADO|RECHAZADO
CONFIANZA: [0.0-1.0]
CRITERIOS:
- [criterio]: PASA|FALLA - [detalle específico]
ISSUES:
- [lista de problemas específicos, si hay]
SUGERENCIAS:
- [mejoras sugeridas]
"""
    results = []
    async for message in query(
        prompt=verifier_prompt,
        options=ClaudeCodeOptions(
            allowed_tools=["Read", "Bash"],  # Solo lectura y ejecución
            max_turns=20
        )
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))

    verification_text = "\n".join(results)

    # Parsear resultado
    passed = "RESULTADO_GLOBAL: APROBADO" in verification_text
    issues = []
    suggestions = []
    confidence = 0.5

    for line in verification_text.split("\n"):
        line = line.strip()
        if line.startswith("- ") and "FALLA" in line:
            issues.append(line[2:])
        elif line.startswith("- ") and "PASA" not in line and "FALLA" not in line:
            if "ISSUES:" in verification_text:
                issues.append(line[2:])
        if "CONFIANZA:" in line:
            try:
                confidence = float(line.split(":")[1].strip())
            except (ValueError, IndexError):
                pass

    return VerificationResult(
        passed=passed,
        issues=issues,
        suggestions=suggestions,
        confidence=confidence
    )


async def cross_verification_pattern(
    task: str,
    verification_criteria: list[str],
    max_attempts: int = 3
) -> tuple[str, list[VerificationResult]]:
    """
    Patrón de verificación cruzada.

    Returns:
        (output_final, historial_de_verificaciones)
    """
    verifications = []
    last_feedback = None

    for attempt in range(1, max_attempts + 1):
        print(f"[CrossVerify] Intento {attempt}/{max_attempts}")

        # Generar
        output = await _generate(task, last_feedback)

        # Verificar
        result = await _verify(task, output, verification_criteria)
        verifications.append(result)

        print(f"[CrossVerify] {'✓ Aprobado' if result.passed else '✗ Rechazado'} "
              f"(confianza: {result.confidence:.0%})")

        if result.passed and result.confidence >= 0.8:
            return output, verifications

        # Preparar feedback para el siguiente intento
        last_feedback = "\n".join([
            "Problemas encontrados:",
            *[f"  - {issue}" for issue in result.issues],
        ])

    print("[CrossVerify] Máximo de intentos alcanzado, retornando mejor resultado")
    return output, verifications

6. Self-Improvement Loop

El Agente Aprende de Sus Errores

# self_improvement.py
import asyncio
import json
from dataclasses import dataclass, field, asdict
from claude_code_sdk import query, ClaudeCodeOptions


@dataclass
class AgentMemory:
    """Memoria de sesión del agente con errores y aprendizajes."""
    successful_patterns: list[str] = field(default_factory=list)
    failed_patterns: list[str] = field(default_factory=list)
    learned_rules: list[str] = field(default_factory=list)
    iteration_count: int = 0

    def to_context_string(self) -> str:
        """Convierte la memoria en texto para incluir en el prompt."""
        if not any([self.successful_patterns, self.failed_patterns, self.learned_rules]):
            return ""

        parts = ["=== Tu historial de aprendizaje en esta sesión ==="]
        if self.learned_rules:
            parts.append("Reglas aprendidas:")
            parts.extend(f"  - {r}" for r in self.learned_rules[-5:])  # Últimas 5
        if self.failed_patterns:
            parts.append("Patrones que han fallado:")
            parts.extend(f"  - {p}" for p in self.failed_patterns[-3:])  # Últimos 3
        if self.successful_patterns:
            parts.append("Patrones exitosos:")
            parts.extend(f"  - {p}" for p in self.successful_patterns[-3:])
        return "\n".join(parts)


async def _run_with_memory(
    task: str,
    memory: AgentMemory,
    tools: list[str]
) -> str:
    """Ejecuta el agente con contexto de memoria."""
    memory_context = memory.to_context_string()

    prompt = f"""
{memory_context}

Tarea actual (intento {memory.iteration_count + 1}): {task}

Aplica lo aprendido de intentos anteriores.
Si encuentras un error, documéntalo claramente para mejorar en el próximo intento.
"""
    results = []
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))
    return "\n".join(results)


async def _extract_learnings(
    task: str,
    output: str,
    success: bool,
    memory: AgentMemory,
    tools: list[str]
) -> AgentMemory:
    """Extrae aprendizajes del intento y actualiza la memoria."""
    learning_prompt = f"""
Analiza este intento de resolver una tarea:

Tarea: {task}
Output producido: {output[:500]}...
¿Fue exitoso? {success}

Extrae:
1. Si fue exitoso: ¿qué patrones funcionaron bien?
2. Si falló: ¿qué patrones deben evitarse?
3. ¿Qué regla general aprendiste?

Responde en JSON:
{{
  "successful_pattern": "descripción breve si fue exitoso, null si no",
  "failed_pattern": "descripción breve si falló, null si no",
  "learned_rule": "regla general aprendida"
}}
"""
    results = []
    async for message in query(
        prompt=learning_prompt,
        options=ClaudeCodeOptions(allowed_tools=[], max_turns=5)
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))

    raw = "\n".join(results)

    try:
        # Extraer JSON del response
        import re
        json_match = re.search(r'\{[^}]+\}', raw, re.DOTALL)
        if json_match:
            learnings = json.loads(json_match.group())
            if learnings.get("successful_pattern"):
                memory.successful_patterns.append(learnings["successful_pattern"])
            if learnings.get("failed_pattern"):
                memory.failed_patterns.append(learnings["failed_pattern"])
            if learnings.get("learned_rule"):
                memory.learned_rules.append(learnings["learned_rule"])
    except (json.JSONDecodeError, AttributeError):
        pass

    memory.iteration_count += 1
    return memory


async def self_improvement_loop(
    task: str,
    success_criteria: str,
    max_iterations: int = 5,
    tools: list[str] | None = None
) -> tuple[str, AgentMemory]:
    """
    Loop de auto-mejora: el agente aprende de cada intento.

    Returns:
        (mejor_output, memoria_final)
    """
    if tools is None:
        tools = ["Read", "Bash"]

    memory = AgentMemory()
    best_output = ""

    for iteration in range(max_iterations):
        print(f"[SelfImprove] Iteración {iteration + 1}/{max_iterations}")
        print(f"[SelfImprove] Reglas aprendidas hasta ahora: {len(memory.learned_rules)}")

        output = await _run_with_memory(task, memory, tools)

        # Verificar si cumple criterios
        eval_prompt = f"""
Criterio de éxito: {success_criteria}
Output a evaluar: {output[:1000]}

¿Cumple el criterio? Responde solo: SI o NO
"""
        eval_result = []
        async for message in query(
            prompt=eval_prompt,
            options=ClaudeCodeOptions(allowed_tools=[], max_turns=3)
        ):
            if hasattr(message, 'content') and message.content:
                eval_result.append(str(message.content))
        success = "SI" in "\n".join(eval_result).upper()

        best_output = output
        memory = await _extract_learnings(task, output, success, memory, tools)

        if success:
            print(f"[SelfImprove] ✓ Éxito en iteración {iteration + 1}")
            break

    return best_output, memory

7. Agentes Especializados con Routing

Router que Decide el Agente Más Apto

# specialist_routing.py
import asyncio
from dataclasses import dataclass
from enum import Enum
from claude_code_sdk import query, ClaudeCodeOptions


class AgentSpecialty(str, Enum):
    PYTHON = "python"
    TYPESCRIPT = "typescript"
    SQL = "sql"
    SECURITY = "security"
    DEVOPS = "devops"
    GENERAL = "general"


@dataclass
class Specialist:
    specialty: AgentSpecialty
    system_prompt: str
    tools: list[str]
    max_turns: int = 30


SPECIALISTS: dict[AgentSpecialty, Specialist] = {
    AgentSpecialty.PYTHON: Specialist(
        specialty=AgentSpecialty.PYTHON,
        system_prompt="""Eres un experto en Python con 15 años de experiencia.
Conoces profundamente: asyncio, type hints, dataclasses, pydantic, fastapi,
testing con pytest, packaging, y mejores prácticas de la comunidad Python.""",
        tools=["Read", "Edit", "Bash"]
    ),
    AgentSpecialty.TYPESCRIPT: Specialist(
        specialty=AgentSpecialty.TYPESCRIPT,
        system_prompt="""Eres un experto en TypeScript con 10 años de experiencia.
Conoces profundamente: tipos avanzados, generics, decorators, Node.js, React,
build tools (vite, esbuild), testing con vitest, y el ecosistema npm.""",
        tools=["Read", "Edit", "Bash"]
    ),
    AgentSpecialty.SQL: Specialist(
        specialty=AgentSpecialty.SQL,
        system_prompt="""Eres un experto en SQL y bases de datos relacionales.
Conoces PostgreSQL, MySQL, SQLite en profundidad. Optimización de queries,
índices, transacciones, migraciones y diseño de schemas.""",
        tools=["Read", "Bash"]
    ),
    AgentSpecialty.SECURITY: Specialist(
        specialty=AgentSpecialty.SECURITY,
        system_prompt="""Eres un experto en seguridad de aplicaciones web.
Conoces OWASP Top 10, vulnerabilidades comunes, autenticación/autorización,
criptografía, secrets management y secure coding practices.""",
        tools=["Read", "Bash"]
    ),
    AgentSpecialty.DEVOPS: Specialist(
        specialty=AgentSpecialty.DEVOPS,
        system_prompt="""Eres un experto en DevOps y operaciones.
Conoces Docker, Kubernetes, CI/CD, monitoring, logging, IaC con Terraform,
AWS/GCP/Azure, y SRE practices.""",
        tools=["Read", "Bash"]
    ),
}


async def _classify_task(task: str) -> AgentSpecialty:
    """Usa LLM para clasificar la tarea y elegir el especialista adecuado."""
    specialties_list = "\n".join([
        f"- {s.value}: {SPECIALISTS[s].system_prompt[:100]}..."
        for s in AgentSpecialty if s != AgentSpecialty.GENERAL
    ])

    classifier_prompt = f"""
Clasifica esta tarea para asignarla al especialista más adecuado:
Tarea: {task}

Especialistas disponibles:
{specialties_list}
- general: Para tareas que no encajan en ninguna especialidad

Responde SOLO con el nombre de la especialidad (una palabra):
python|typescript|sql|security|devops|general
"""
    results = []
    async for message in query(
        prompt=classifier_prompt,
        options=ClaudeCodeOptions(allowed_tools=[], max_turns=3)
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))

    raw = "\n".join(results).strip().lower()

    for specialty in AgentSpecialty:
        if specialty.value in raw:
            return specialty

    return AgentSpecialty.GENERAL


async def run_specialist(task: str, specialty: AgentSpecialty) -> str:
    """Ejecuta el agente especialista para la tarea."""
    specialist = SPECIALISTS.get(specialty)

    if not specialist:
        # Fallback a agente general
        results = []
        async for message in query(
            prompt=task,
            options=ClaudeCodeOptions(allowed_tools=["Read", "Bash"], max_turns=30)
        ):
            if hasattr(message, 'content') and message.content:
                results.append(str(message.content))
        return "\n".join(results)

    full_prompt = f"{specialist.system_prompt}\n\nTarea: {task}"
    results = []
    async for message in query(
        prompt=full_prompt,
        options=ClaudeCodeOptions(
            allowed_tools=specialist.tools,
            max_turns=specialist.max_turns
        )
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))
    return "\n".join(results)


async def routed_specialist(task: str) -> tuple[str, AgentSpecialty]:
    """Router + especialista: clasifica y ejecuta con el experto correcto."""
    print(f"[Router] Clasificando tarea: {task[:80]}...")
    specialty = await _classify_task(task)
    print(f"[Router] Especialista seleccionado: {specialty.value}")

    result = await run_specialist(task, specialty)
    return result, specialty

8. Comunicación Indirecta via Artefactos

Agentes que se Comunican por Archivos

# artifact_communication.py
import asyncio
import os
import json
import time
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions


async def producer_agent(
    task: str,
    output_file: str,
    tools: list[str] | None = None
) -> None:
    """
    Agente productor: escribe resultados a un archivo para que
    el consumidor los procese.
    """
    if tools is None:
        tools = ["Read", "Bash", "Write"]

    prompt = f"""
{task}

Cuando termines, guarda tu resultado en el archivo: {output_file}
El formato debe ser JSON con esta estructura:
{{
  "status": "completed",
  "timestamp": <unix_timestamp>,
  "result": <tu resultado>,
  "metadata": {{}}
}}
"""
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
    ):
        pass  # El agente escribe directamente al archivo


async def consumer_agent(
    input_file: str,
    processing_task: str,
    output_file: str,
    poll_interval: float = 2.0,
    timeout: float = 300.0
) -> bool:
    """
    Agente consumidor: espera que el archivo aparezca y lo procesa.

    Returns:
        True si procesó exitosamente, False si timeout
    """
    print(f"[Consumer] Esperando archivo: {input_file}")
    deadline = time.time() + timeout

    # Polling hasta que el archivo esté disponible
    while time.time() < deadline:
        if os.path.exists(input_file):
            try:
                with open(input_file) as f:
                    data = json.load(f)
                if data.get("status") == "completed":
                    break
            except (json.JSONDecodeError, KeyError):
                pass
        await asyncio.sleep(poll_interval)
    else:
        print(f"[Consumer] Timeout esperando {input_file}")
        return False

    print(f"[Consumer] Archivo recibido, procesando...")

    prompt = f"""
{processing_task}

El input está en el archivo: {input_file}
Lee el archivo, procesa el contenido y guarda el resultado en: {output_file}
"""
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(
            allowed_tools=["Read", "Bash"],
            max_turns=20
        )
    ):
        pass

    return True


async def artifact_pipeline(work_dir: str = "/tmp/agent_artifacts") -> None:
    """
    Pipeline de agentes que se comunican solo via archivos.
    No hay coordinador central.
    """
    Path(work_dir).mkdir(parents=True, exist_ok=True)

    analysis_file = f"{work_dir}/analysis.json"
    review_file = f"{work_dir}/review.json"
    report_file = f"{work_dir}/report.md"

    # Los 3 agentes corren "independientemente"
    # Solo se sincronizan por la existencia de los archivos
    await asyncio.gather(
        # Agente 1: Analiza el código
        producer_agent(
            task="Analiza el directorio src/ y encuentra todos los TODO comments",
            output_file=analysis_file
        ),
        # Agente 2: Espera el análisis y lo revisa
        consumer_agent(
            input_file=analysis_file,
            processing_task="Prioriza los TODO comments por urgencia e impacto",
            output_file=review_file
        ),
        # Agente 3: Genera reporte final cuando la review esté lista
        consumer_agent(
            input_file=review_file,
            processing_task="Genera un reporte markdown con los TODO priorizados",
            output_file=report_file
        ),
    )

    print(f"[Pipeline] Reporte generado en: {report_file}")

9. Supervisión Humana Escalable

Dashboard de Aprobaciones Pendientes

# human_supervision.py
import asyncio
import uuid
import json
from dataclasses import dataclass, asdict, field
from typing import Callable, Awaitable
from enum import Enum
import redis.asyncio as redis


class ApprovalStatus(str, Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"
    TIMEOUT = "timeout"


@dataclass
class ApprovalRequest:
    request_id: str
    session_id: str
    action_type: str
    action_description: str
    action_details: dict
    risk_level: str  # "low", "medium", "high", "critical"
    requester: str
    timeout_seconds: int = 300
    status: ApprovalStatus = ApprovalStatus.PENDING
    reviewer_comment: str = ""

    def to_dict(self) -> dict:
        d = asdict(self)
        d["status"] = self.status.value
        return d


class HumanApprovalGateway:
    """
    Gateway para solicitar aprobación humana en acciones del agente.
    Usa Redis para comunicación entre el agente y el dashboard.
    """

    PENDING_QUEUE = "approvals:pending"
    RESULTS_PREFIX = "approvals:result:"

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def request_approval(
        self,
        session_id: str,
        action_type: str,
        action_description: str,
        action_details: dict,
        risk_level: str = "medium",
        timeout_seconds: int = 300
    ) -> ApprovalStatus:
        """
        Solicita aprobación humana y espera la respuesta.

        En acciones de alto riesgo, el agente se bloquea hasta que
        un humano aprueba o rechaza.
        """
        request_id = str(uuid.uuid4())[:8]

        request = ApprovalRequest(
            request_id=request_id,
            session_id=session_id,
            action_type=action_type,
            action_description=action_description,
            action_details=action_details,
            risk_level=risk_level,
            requester=f"agent:{session_id}",
            timeout_seconds=timeout_seconds
        )

        # Publicar en queue
        await self.redis.lpush(
            self.PENDING_QUEUE,
            json.dumps(request.to_dict())
        )

        # Notificar al dashboard (via pub/sub)
        await self.redis.publish(
            "approvals:new",
            json.dumps({"request_id": request_id, "risk_level": risk_level})
        )

        print(f"[Approval] Esperando aprobación para: {action_description[:80]}")

        # Esperar respuesta
        deadline = asyncio.get_event_loop().time() + timeout_seconds

        while asyncio.get_event_loop().time() < deadline:
            result = await self.redis.get(f"{self.RESULTS_PREFIX}{request_id}")
            if result:
                data = json.loads(result)
                status = ApprovalStatus(data["status"])
                print(f"[Approval] Decisión recibida: {status.value}")
                return status
            await asyncio.sleep(2.0)

        # Timeout: registrar y usar fallback
        print(f"[Approval] Timeout esperando aprobación para {request_id}")
        return ApprovalStatus.TIMEOUT

    async def get_pending_approvals(self) -> list[dict]:
        """Obtiene todas las aprobaciones pendientes para el dashboard."""
        items = await self.redis.lrange(self.PENDING_QUEUE, 0, -1)
        return [json.loads(item) for item in items]

    async def submit_decision(
        self,
        request_id: str,
        approved: bool,
        comment: str = ""
    ) -> None:
        """El humano toma una decisión desde el dashboard."""
        status = ApprovalStatus.APPROVED if approved else ApprovalStatus.REJECTED

        await self.redis.setex(
            f"{self.RESULTS_PREFIX}{request_id}",
            3600,
            json.dumps({"status": status.value, "comment": comment})
        )


# ============================================================
# Hook para el SDK que solicita aprobación antes de acciones críticas
# ============================================================

def create_approval_hook(
    gateway: HumanApprovalGateway,
    session_id: str,
    high_risk_tools: list[str] | None = None
) -> Callable:
    """
    Crea un hook de PreToolUse que solicita aprobación para
    herramientas de alto riesgo.
    """
    if high_risk_tools is None:
        high_risk_tools = ["Bash"]  # Bash puede ejecutar cualquier comando

    async def pre_tool_hook(tool_name: str, tool_input: dict) -> dict | None:
        """Intercepta llamadas a herramientas de alto riesgo."""
        if tool_name not in high_risk_tools:
            return None  # Permitir sin aprobación

        # Determinar nivel de riesgo basado en el comando
        command = tool_input.get("command", "")
        if any(dangerous in command for dangerous in ["rm -rf", "DROP TABLE", "DELETE FROM"]):
            risk_level = "critical"
            timeout = 600  # 10 minutos para decisiones críticas
        else:
            risk_level = "medium"
            timeout = 300  # 5 minutos normal

        status = await gateway.request_approval(
            session_id=session_id,
            action_type=f"tool:{tool_name}",
            action_description=f"Ejecutar {tool_name}: {command[:100]}",
            action_details=tool_input,
            risk_level=risk_level,
            timeout_seconds=timeout
        )

        if status == ApprovalStatus.APPROVED:
            return None  # Proceder con la herramienta

        if status == ApprovalStatus.REJECTED:
            return {
                "error": "Acción rechazada por revisión humana",
                "action": "skip_tool"
            }

        # Timeout: fallback conservador (no ejecutar)
        return {
            "error": "Timeout de aprobación: acción no ejecutada por seguridad",
            "action": "skip_tool"
        }

    return pre_tool_hook

10. Sistema de Agentes con Memoria Compartida

Vector Database como Memoria Compartida

# shared_memory_agents.py
import asyncio
import json
import hashlib
from dataclasses import dataclass
from typing import Any
from claude_code_sdk import query, ClaudeCodeOptions

# Simulación de vector DB (en producción usar ChromaDB, Pinecone, Weaviate)
class SimpleVectorMemory:
    """
    Memoria compartida simple basada en búsqueda por keywords.
    En producción, reemplazar con ChromaDB o Pinecone para búsqueda semántica real.
    """

    def __init__(self):
        self._store: dict[str, dict] = {}

    def store(
        self,
        content: str,
        metadata: dict | None = None,
        tags: list[str] | None = None
    ) -> str:
        """Almacena un fragmento de conocimiento."""
        doc_id = hashlib.md5(content.encode()).hexdigest()[:8]
        self._store[doc_id] = {
            "id": doc_id,
            "content": content,
            "metadata": metadata or {},
            "tags": tags or [],
        }
        return doc_id

    def search(self, query_text: str, top_k: int = 5) -> list[dict]:
        """Búsqueda simple por keywords."""
        query_words = set(query_text.lower().split())
        scored = []

        for doc in self._store.values():
            doc_words = set(doc["content"].lower().split())
            tag_words = set(" ".join(doc["tags"]).lower().split())
            all_words = doc_words | tag_words

            score = len(query_words & all_words) / max(len(query_words), 1)
            if score > 0:
                scored.append((score, doc))

        scored.sort(key=lambda x: x[0], reverse=True)
        return [doc for _, doc in scored[:top_k]]


async def agent_with_shared_memory(
    task: str,
    memory: SimpleVectorMemory,
    agent_name: str = "agent",
    tools: list[str] | None = None
) -> str:
    """
    Agente que consulta y actualiza la memoria compartida antes y después
    de ejecutar su tarea.
    """
    if tools is None:
        tools = ["Read", "Bash"]

    # Paso 1: Recuperar conocimiento relevante de la memoria
    relevant_docs = memory.search(task, top_k=3)

    if relevant_docs:
        memory_context = "=== Conocimiento previo relevante ===\n"
        for doc in relevant_docs:
            memory_context += f"\n--- {doc['id']} ---\n{doc['content'][:300]}\n"
    else:
        memory_context = ""

    # Paso 2: Ejecutar tarea con contexto de memoria
    full_prompt = f"""
{memory_context}

Tarea actual: {task}

Aplica el conocimiento previo si es relevante.
Si descubres algo nuevo o útil durante tu trabajo, inclúyelo al final del resultado
en una sección llamada "=== NUEVO CONOCIMIENTO ===".
"""

    results = []
    async for message in query(
        prompt=full_prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))

    full_result = "\n".join(results)

    # Paso 3: Extraer nuevo conocimiento y guardarlo en memoria compartida
    if "=== NUEVO CONOCIMIENTO ===" in full_result:
        new_knowledge_section = full_result.split("=== NUEVO CONOCIMIENTO ===")[1]
        if new_knowledge_section.strip():
            memory.store(
                content=new_knowledge_section.strip(),
                metadata={"agent": agent_name, "task_preview": task[:100]},
                tags=task.split()[:5]  # Primeras 5 palabras como tags
            )
            print(f"[Memory] {agent_name} guardó nuevo conocimiento")

    return full_result


async def multi_agent_with_shared_memory(tasks: list[str]) -> list[str]:
    """
    Múltiples agentes que comparten conocimiento acumulado.
    El segundo agente se beneficia de lo que aprendió el primero.
    """
    shared_memory = SimpleVectorMemory()
    results = []

    # Los agentes se ejecutan secuencialmente para que puedan compartir conocimiento
    for i, task in enumerate(tasks):
        print(f"\n[SharedMemory] Agente {i+1} procesando: {task[:60]}...")
        result = await agent_with_shared_memory(
            task=task,
            memory=shared_memory,
            agent_name=f"agent_{i+1}"
        )
        results.append(result)
        print(f"[SharedMemory] Documentos en memoria: {len(shared_memory._store)}")

    return results


# Ejemplo de uso
async def main():
    tasks = [
        "Analiza el archivo config.py y documenta todas las configuraciones",
        "¿Cuáles configuraciones de config.py son críticas para seguridad?",
        "Genera un checklist de validación para las configuraciones de seguridad",
    ]

    results = await multi_agent_with_shared_memory(tasks)

    for i, result in enumerate(results, 1):
        print(f"\n=== Resultado Agente {i} ===")
        print(result[:500])

if __name__ == "__main__":
    asyncio.run(main())

Resumen del Capítulo

Los patrones avanzados expanden significativamente las capacidades de los sistemas multi-agente:

flowchart TB
    subgraph Calidad["Para mejorar calidad"]
        Reflection["Reflection\nGenerar → Criticar → Mejorar"]
        CrossVerify["Verificación Cruzada\nGenerador + Verificador"]
        Ensemble["Ensemble\nMúltiples → Voting"]
    end

    subgraph Decision["Para mejores decisiones"]
        Debate["Debate\nPro vs Contra + Árbitro"]
        Router["Routing\nClasificar → Especialista"]
    end

    subgraph Learning["Para aprender y adaptarse"]
        SelfImprove["Self-Improvement\nAprender de errores"]
        SharedMem["Memoria Compartida\nConocimiento acumulado"]
    end

    subgraph Coordination["Para coordinación flexible"]
        Artifacts["Artefactos\nComunicación por archivos"]
        HumanSuper["Supervisión Humana\nAprobación escalable"]
    end

Cuándo usar cada patrón:

Próximo capítulo: Capítulo 20: SDK Internals y Extensiones — Cómo funciona el SDK por dentro y cómo extenderlo.


11. Patrón Critic-Actor Profundo

Más Allá del Reflection: Rúbricas Explícitas

El patrón Reflection básico tiene un critic que juzga de forma holística. El patrón Critic-Actor Profundo va más lejos: el critic usa una rúbrica con pesos que define criterios explícitos y puntuaciones. Esto hace el proceso transparente, reproducible y alineado con los objetivos del negocio.

sequenceDiagram
    participant Actor
    participant Critic
    participant Rubrica as Rúbrica (pesos)

    Actor->>Actor: Genera documentación v1
    Actor->>Critic: Entrega documento

    loop Hasta score >= umbral
        Critic->>Rubrica: Evalúa contra criterios
        Note over Critic,Rubrica: Completitud 40%<br/>Claridad 30%<br/>Ejemplos 20%<br/>Formato 10%
        Rubrica->>Critic: Score por criterio
        Critic->>Actor: Feedback estructurado con scores
        Actor->>Actor: Mejora enfocada en criterios débiles
        Actor->>Critic: Nueva versión
    end

    Critic->>Actor: APROBADO (score >= 0.85)

Implementación con Rúbrica Pesada

# critic_actor_deep.py
import asyncio
import json
import re
from dataclasses import dataclass, field
from claude_code_sdk import query, ClaudeCodeOptions


@dataclass
class RubricCriterion:
    name: str
    weight: float  # 0.0 a 1.0, la suma de todos debe ser 1.0
    description: str
    scoring_guide: str  # Descripción de qué puntúa 0.0, 0.5, 1.0


@dataclass
class CriticScore:
    criterion: str
    score: float  # 0.0 a 1.0
    feedback: str
    suggestions: list[str]


@dataclass
class IterationResult:
    iteration: int
    content: str
    total_score: float
    criterion_scores: list[CriticScore]
    approved: bool


DOCUMENTATION_RUBRIC = [
    RubricCriterion(
        name="completitud",
        weight=0.40,
        description="¿La documentación cubre todos los aspectos necesarios?",
        scoring_guide="0.0=falta información esencial, 0.5=cubre lo básico, 1.0=cobertura completa con edge cases"
    ),
    RubricCriterion(
        name="claridad",
        weight=0.30,
        description="¿Es clara y fácil de entender para el público objetivo?",
        scoring_guide="0.0=confusa o ambigua, 0.5=comprensible con esfuerzo, 1.0=cristalina y directa"
    ),
    RubricCriterion(
        name="ejemplos",
        weight=0.20,
        description="¿Incluye ejemplos concretos y funcionales?",
        scoring_guide="0.0=sin ejemplos, 0.5=ejemplos básicos, 1.0=ejemplos completos con contexto"
    ),
    RubricCriterion(
        name="formato",
        weight=0.10,
        description="¿Tiene estructura correcta, headings, y es scannable?",
        scoring_guide="0.0=muro de texto, 0.5=estructura básica, 1.0=bien estructurado y escaneable"
    ),
]


def build_rubric_prompt(rubric: list[RubricCriterion]) -> str:
    """Construye el texto de la rúbrica para el prompt del critic."""
    lines = ["=== RÚBRICA DE EVALUACIÓN ==="]
    for criterion in rubric:
        lines.append(
            f"\n**{criterion.name.upper()}** (peso: {criterion.weight:.0%})\n"
            f"Descripción: {criterion.description}\n"
            f"Guía de puntuación: {criterion.scoring_guide}"
        )
    return "\n".join(lines)


async def run_actor(task: str, previous_content: str, feedback: str, tools: list[str]) -> str:
    """Agente actor: genera o mejora contenido basado en feedback."""
    if previous_content:
        prompt = f"""
Tarea: {task}

Tu versión anterior:
{previous_content}

Feedback del crítico:
{feedback}

Mejora tu contenido atendiendo ESPECÍFICAMENTE el feedback.
Para cada punto de feedback, aplica la corrección sugerida.
Produce la versión completa mejorada.
"""
    else:
        prompt = f"""
Tarea: {task}

Produce una primera versión completa y detallada.
Incluye ejemplos concretos y estructura clara.
"""
    results: list[str] = []
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=20)
    ):
        if hasattr(message, "content") and message.content:
            results.append(str(message.content))
    return "\n".join(results)


async def run_critic(
    task: str,
    content: str,
    rubric: list[RubricCriterion],
    tools: list[str]
) -> tuple[float, list[CriticScore]]:
    """
    Agente critic: evalúa el contenido contra la rúbrica.

    Returns:
        (total_score ponderado, lista de CriticScore por criterio)
    """
    rubric_text = build_rubric_prompt(rubric)
    criterion_names = [c.name for c in rubric]

    critic_prompt = f"""
Eres un CRÍTICO RIGUROSO. Tu objetivo es mejorar la calidad, no aprobar fácilmente.

Tarea que debía cumplir el actor: {task}

Contenido a evaluar:
{content}

{rubric_text}

Para cada criterio, evalúa y proporciona:
1. Score de 0.0 a 1.0 (con decimales)
2. Justificación específica (no genérica)
3. Sugerencias concretas para mejorar

Responde en JSON exactamente:
{{
  "evaluaciones": [
    {{
      "criterio": "<nombre del criterio>",
      "score": <float 0.0-1.0>,
      "feedback": "<justificación>",
      "sugerencias": ["<sugerencia 1>", "<sugerencia 2>"]
    }}
  ]
}}

Criterios: {criterion_names}
"""
    results: list[str] = []
    async for message in query(
        prompt=critic_prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=10)
    ):
        if hasattr(message, "content") and message.content:
            results.append(str(message.content))

    raw = "\n".join(results)
    scores: list[CriticScore] = []

    try:
        json_match = re.search(r'\{.*\}', raw, re.DOTALL)
        if json_match:
            data = json.loads(json_match.group())
            criterion_map = {c.name: c for c in rubric}
            for eval_data in data.get("evaluaciones", []):
                scores.append(CriticScore(
                    criterion=eval_data["criterio"],
                    score=float(eval_data["score"]),
                    feedback=eval_data["feedback"],
                    suggestions=eval_data.get("sugerencias", [])
                ))
    except (json.JSONDecodeError, KeyError):
        # Fallback si el JSON no se puede parsear
        for criterion in rubric:
            scores.append(CriticScore(
                criterion=criterion.name,
                score=0.5,
                feedback="No se pudo obtener evaluación estructurada",
                suggestions=[]
            ))

    # Calcular score ponderado
    total_score = 0.0
    weight_map = {c.name: c.weight for c in rubric}
    for cs in scores:
        total_score += cs.score * weight_map.get(cs.criterion, 0.0)

    return total_score, scores


def build_feedback_text(scores: list[CriticScore], threshold: float = 0.7) -> str:
    """Construye el texto de feedback para el actor."""
    weak_criteria = [cs for cs in scores if cs.score < threshold]
    lines = ["=== FEEDBACK DEL CRÍTICO ==="]

    for cs in weak_criteria:
        lines.append(f"\n**{cs.criterion.upper()}** (score: {cs.score:.2f})")
        lines.append(f"Problema: {cs.feedback}")
        for suggestion in cs.suggestions:
            lines.append(f"  -> {suggestion}")

    if not weak_criteria:
        lines.append("Todos los criterios superan el umbral mínimo.")

    return "\n".join(lines)


async def critic_actor_deep(
    task: str,
    rubric: list[RubricCriterion] | None = None,
    approval_threshold: float = 0.85,
    max_iterations: int = 4,
    actor_tools: list[str] | None = None,
    critic_tools: list[str] | None = None
) -> tuple[str, list[IterationResult]]:
    """
    Patrón Critic-Actor Profundo con rúbrica pesada.

    Returns:
        (contenido_final, historial_de_iteraciones)
    """
    if rubric is None:
        rubric = DOCUMENTATION_RUBRIC
    if actor_tools is None:
        actor_tools = ["Read", "Bash"]
    if critic_tools is None:
        critic_tools = ["Read"]

    history: list[IterationResult] = []
    current_content = ""
    feedback_text = ""

    for iteration in range(1, max_iterations + 1):
        print(f"\n[CriticActor] === Iteración {iteration}/{max_iterations} ===")

        # Actor genera/mejora
        current_content = await run_actor(task, current_content, feedback_text, actor_tools)
        print(f"[CriticActor] Actor produjo {len(current_content)} chars")

        # Critic evalúa con rúbrica
        total_score, criterion_scores = await run_critic(task, current_content, rubric, critic_tools)
        print(f"[CriticActor] Score total: {total_score:.3f} (umbral: {approval_threshold})")

        for cs in criterion_scores:
            status = "OK" if cs.score >= 0.7 else "MEJORA"
            print(f"  [{status}] {cs.criterion}: {cs.score:.2f}")

        approved = total_score >= approval_threshold
        history.append(IterationResult(
            iteration=iteration,
            content=current_content,
            total_score=total_score,
            criterion_scores=criterion_scores,
            approved=approved
        ))

        if approved:
            print(f"[CriticActor] Aprobado con score {total_score:.3f}")
            break

        # Preparar feedback enfocado en criterios débiles
        feedback_text = build_feedback_text(criterion_scores)

    return current_content, history


# Ejemplo: documentación técnica de una API
async def main():
    task = """
    Genera documentación técnica para la función:

    async def calculate_compound_interest(principal: float, rate: float, periods: int) -> float

    La documentación debe ser útil para desarrolladores Python que integren esta función.
    """
    content, history = await critic_actor_deep(
        task=task,
        approval_threshold=0.85,
        max_iterations=3
    )

    print("\n=== RESULTADO FINAL ===")
    print(f"Iteraciones: {len(history)}")
    print(f"Score final: {history[-1].total_score:.3f}")
    print(content[:500])


if __name__ == "__main__":
    asyncio.run(main())

12. Patrón Constitutional AI para Agentes

Agente que se Auto-Audita contra Principios

El Constitutional AI (CAI) es una técnica donde el agente evalúa su propio output contra un conjunto de principios predefinidos antes de entregarlo. Esto reduce sesgos y garantiza alineación con valores organizacionales.

# constitutional_agent.py
import asyncio
from dataclasses import dataclass
from claude_code_sdk import query, ClaudeCodeOptions


@dataclass
class ConstitutionalPrinciple:
    name: str
    description: str
    check_prompt: str
    severity: str  # "critical", "major", "minor"


CODE_CONSTITUTION = [
    ConstitutionalPrinciple(
        name="no_secrets_hardcodeados",
        description="El código no debe contener contraseñas, API keys o tokens hardcodeados.",
        check_prompt="¿El código contiene credenciales, contraseñas, API keys o tokens hardcodeados (no en variables de entorno)? Responde SI o NO con justificación.",
        severity="critical"
    ),
    ConstitutionalPrinciple(
        name="no_sql_injection",
        description="Las queries SQL deben usar parámetros, nunca interpolación directa de strings.",
        check_prompt="¿El código ejecuta queries SQL con f-strings o concatenación directa de variables? Responde SI o NO con justificación.",
        severity="critical"
    ),
    ConstitutionalPrinciple(
        name="manejo_de_errores",
        description="Las operaciones que pueden fallar deben tener manejo de errores explícito.",
        check_prompt="¿Hay operaciones de red, archivos o bases de datos sin manejo de excepciones? Responde SI o NO con justificación.",
        severity="major"
    ),
    ConstitutionalPrinciple(
        name="no_codigo_comentado",
        description="El código no debe contener bloques de código comentado.",
        check_prompt="¿Hay código Python/JavaScript comentado (no docstrings, sino código que fue desactivado)? Responde SI o NO.",
        severity="minor"
    ),
]


@dataclass
class AuditResult:
    principle: str
    violation: bool
    severity: str
    explanation: str


async def audit_against_principle(
    code: str,
    principle: ConstitutionalPrinciple,
    tools: list[str]
) -> AuditResult:
    """Audita el código contra un principio constitucional."""
    audit_prompt = f"""
Principio a verificar: {principle.name}
Descripción: {principle.description}

Código a auditar:

{code}


{principle.check_prompt}
Justifica tu respuesta con referencias específicas al código.
"""
    texts: list[str] = []
    async for message in query(
        prompt=audit_prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=5)
    ):
        if hasattr(message, "content") and message.content:
            texts.append(str(message.content))

    response = "\n".join(texts).upper()
    violation = "SI" in response and "NO" not in response[:20]

    return AuditResult(
        principle=principle.name,
        violation=violation,
        severity=principle.severity,
        explanation="\n".join(texts)[:300]
    )


async def constitutional_agent(
    task: str,
    constitution: list[ConstitutionalPrinciple] | None = None,
    max_revision_cycles: int = 3,
    tools: list[str] | None = None
) -> tuple[str, list[AuditResult]]:
    """
    Agente con auto-auditoría constitucional.

    Flujo: Generar → Auditar → Revisar si hay violaciones críticas → Retornar

    Returns:
        (código_final, resultados_de_auditoría)
    """
    if constitution is None:
        constitution = CODE_CONSTITUTION
    if tools is None:
        tools = ["Bash"]

    audit_tools = []  # El auditor no necesita herramientas externas

    # Paso 1: Generar el código
    print("[Constitutional] Generando código inicial...")
    gen_results: list[str] = []
    async for message in query(
        prompt=task,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=20)
    ):
        if hasattr(message, "content") and message.content:
            gen_results.append(str(message.content))

    code = "\n".join(gen_results)

    for cycle in range(max_revision_cycles):
        print(f"[Constitutional] Ciclo de auditoría {cycle + 1}/{max_revision_cycles}")

        # Paso 2: Auditar en paralelo contra todos los principios
        audit_tasks = [
            audit_against_principle(code, principle, audit_tools)
            for principle in constitution
        ]
        audit_results = await asyncio.gather(*audit_tasks)

        violations = [r for r in audit_results if r.violation]
        critical_violations = [r for r in violations if r.severity == "critical"]

        for result in audit_results:
            status = "VIOLACION" if result.violation else "OK"
            print(f"  [{status}] {result.principle} ({result.severity})")

        # Paso 3: Si no hay violaciones críticas, aprobar
        if not critical_violations:
            print(f"[Constitutional] Aprobado. Violaciones menores: {len(violations) - len(critical_violations)}")
            return code, list(audit_results)

        # Paso 4: Corregir violaciones críticas
        violations_text = "\n".join([
            f"- {v.principle}: {v.explanation[:200]}"
            for v in critical_violations
        ])

        revision_prompt = f"""
Tu código generado tiene violaciones de seguridad críticas:

{violations_text}

Código original:
{code}

Corrige TODAS las violaciones críticas listadas. Produce el código corregido completo.
"""
        rev_results: list[str] = []
        async for message in query(
            prompt=revision_prompt,
            options=ClaudeCodeOptions(allowed_tools=tools, max_turns=15)
        ):
            if hasattr(message, "content") and message.content:
                rev_results.append(str(message.content))
        code = "\n".join(rev_results)

    print("[Constitutional] Máximo de ciclos alcanzado")
    return code, list(audit_results)

13. Patrón Speculative Execution

Ejecutar Múltiples Estrategias en Paralelo, Usar la Primera Exitosa

Cuando hay incertidumbre sobre qué estrategia resolverá el problema, ejecuta varias en paralelo y usa la que termine exitosamente primero. Las demás se cancelan.

flowchart LR
    Task["Tarea de Refactoring"] --> |fork| S1["Estrategia 1:\nExtract Method"]
    Task --> |fork| S2["Estrategia 2:\nDecompose Conditional"]
    Task --> |fork| S3["Estrategia 3:\nReplace Temp with Query"]

    S1 --> |termina primero| Race{{"Race\n(primero en completar)"}
    S2 --> |todavía corriendo| Race
    S3 --> |todavía corriendo| Race

    Race --> |cancela| S2
    Race --> |cancela| S3
    Race --> Result["Resultado Final\n(Estrategia 1)"]

Implementación con asyncio

# speculative_execution.py
import asyncio
from dataclasses import dataclass
from typing import Any, Callable, Coroutine
from claude_code_sdk import query, ClaudeCodeOptions


@dataclass
class StrategyResult:
    strategy_name: str
    output: str
    success: bool
    error: str | None = None


async def run_strategy(
    strategy_name: str,
    task: str,
    strategy_hint: str,
    tools: list[str],
    max_turns: int = 25,
) -> StrategyResult:
    """Ejecuta una estrategia específica para resolver la tarea."""
    prompt = f"""
Estrategia a usar: {strategy_hint}

Tarea: {task}

Aplica ESPECÍFICAMENTE la estrategia indicada, no otras.
Al terminar, verifica que la solución es correcta ejecutando tests si están disponibles.
"""
    texts: list[str] = []
    try:
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(allowed_tools=tools, max_turns=max_turns)
        ):
            if hasattr(message, "content") and message.content:
                texts.append(str(message.content))

        output = "\n".join(texts)
        success = "error" not in output.lower()[:100]
        return StrategyResult(strategy_name=strategy_name, output=output, success=success)

    except Exception as e:
        return StrategyResult(
            strategy_name=strategy_name,
            output="",
            success=False,
            error=str(e)
        )


async def speculative_execution(
    task: str,
    strategies: list[dict[str, str]],
    tools: list[str] | None = None,
    timeout_per_strategy: float = 120.0,
) -> StrategyResult:
    """
    Ejecuta múltiples estrategias en paralelo.
    Retorna la primera que termine exitosamente.
    Cancela las demás.

    Args:
        task: Descripción de la tarea
        strategies: Lista de dicts con 'name' y 'hint' para cada estrategia
        tools: Herramientas disponibles
        timeout_per_strategy: Timeout máximo en segundos

    Returns:
        El resultado de la estrategia ganadora
    """
    if tools is None:
        tools = ["Read", "Edit", "Bash"]

    print(f"[Speculative] Lanzando {len(strategies)} estrategias en paralelo...")

    async def strategy_with_timeout(strategy: dict) -> StrategyResult:
        try:
            return await asyncio.wait_for(
                run_strategy(strategy["name"], task, strategy["hint"], tools),
                timeout=timeout_per_strategy
            )
        except asyncio.TimeoutError:
            return StrategyResult(
                strategy_name=strategy["name"],
                output="",
                success=False,
                error=f"Timeout después de {timeout_per_strategy}s"
            )

    # Crear todas las tareas
    coroutines = [strategy_with_timeout(s) for s in strategies]
    tasks = [asyncio.create_task(coro) for coro in coroutines]

    winner: StrategyResult | None = None

    try:
        # Esperar la primera que termine exitosamente
        for coro in asyncio.as_completed([asyncio.shield(t) for t in tasks]):
            result = await coro
            print(f"[Speculative] Estrategia '{result.strategy_name}' terminó: success={result.success}")
            if result.success and winner is None:
                winner = result
                break  # Tenemos un ganador
    finally:
        # Cancelar todas las tareas pendientes
        for t in tasks:
            if not t.done():
                t.cancel()
                print(f"[Speculative] Cancelando tarea pendiente...")

    if winner is None:
        # Ninguna fue exitosa, retornar la primera
        completed = [t for t in tasks if t.done() and not t.cancelled()]
        if completed:
            winner = completed[0].result()
        else:
            winner = StrategyResult(
                strategy_name="none",
                output="Ninguna estrategia completó exitosamente",
                success=False
            )

    print(f"[Speculative] Ganadora: '{winner.strategy_name}'")
    return winner


# Ejemplo: 3 estrategias de refactoring en paralelo
async def main():
    task = """
    Refactoriza la función process_data() en src/processor.py.
    La función tiene 150 líneas y hace demasiadas cosas.
    El código refactorizado debe pasar todos los tests existentes.
    """

    strategies = [
        {
            "name": "extract_method",
            "hint": "Extrae submétodos con responsabilidad única. Cada método debe hacer exactamente una cosa."
        },
        {
            "name": "decompose_conditional",
            "hint": "Simplifica los condicionales complejos usando guard clauses y early returns."
        },
        {
            "name": "pipeline_pattern",
            "hint": "Convierte la función en un pipeline de transformaciones pequeñas encadenadas."
        }
    ]

    winner = await speculative_execution(task, strategies, timeout_per_strategy=90.0)
    print(f"\nEstrategia ganadora: {winner.strategy_name}")
    print(winner.output[:500])


if __name__ == "__main__":
    asyncio.run(main())

14. Sistemas de Agentes Reactivos

Agentes que Reaccionan a Eventos del Filesystem

Los agentes reactivos no esperan instrucciones explícitas: monitorean el ambiente y actúan cuando algo cambia. Son ideales para CI/CD, pipelines de datos y monitoreo.

# reactive_agents.py
import asyncio
import os
import time
from pathlib import Path
from dataclasses import dataclass, field
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
from claude_code_sdk import query, ClaudeCodeOptions


@dataclass
class FileEvent:
    path: str
    event_type: str  # "created", "modified"
    timestamp: float = field(default_factory=time.time)


class AgentFileWatcher(FileSystemEventHandler):
    """
    File watcher que pone eventos en una cola async
    para que el agente los procese.
    """

    def __init__(self, event_queue: asyncio.Queue, patterns: list[str] | None = None):
        self.event_queue = event_queue
        self.patterns = patterns or ["*.py", "*.ts", "*.json"]
        self._loop = asyncio.get_event_loop()

    def _matches_pattern(self, path: str) -> bool:
        from fnmatch import fnmatch
        return any(fnmatch(os.path.basename(path), p) for p in self.patterns)

    def on_created(self, event):
        if not event.is_directory and self._matches_pattern(event.src_path):
            file_event = FileEvent(path=event.src_path, event_type="created")
            self._loop.call_soon_threadsafe(
                self.event_queue.put_nowait, file_event
            )

    def on_modified(self, event):
        if not event.is_directory and self._matches_pattern(event.src_path):
            file_event = FileEvent(path=event.src_path, event_type="modified")
            self._loop.call_soon_threadsafe(
                self.event_queue.put_nowait, file_event
            )


async def file_processor_agent(event: FileEvent, cwd: str) -> str:
    """Agente que procesa un archivo cuando es creado o modificado."""
    if event.event_type == "created":
        task = f"Nuevo archivo detectado: {event.path}. Revisa si tiene problemas obvios de calidad o seguridad."
    else:
        task = f"Archivo modificado: {event.path}. Verifica que los cambios no introducen regresiones."

    texts: list[str] = []
    async for message in query(
        prompt=task,
        options=ClaudeCodeOptions(
            cwd=cwd,
            allowed_tools=["Read"],
            model="claude-haiku-4-5",
            max_turns=5,
        )
    ):
        if hasattr(message, "content") and message.content:
            texts.append(str(message.content))

    return "\n".join(texts)


async def reactive_file_agent(
    watch_dir: str,
    patterns: list[str] | None = None,
    duration_seconds: float = 60.0
) -> None:
    """
    Agente reactivo que monitorea un directorio y procesa archivos.

    Args:
        watch_dir: Directorio a monitorear
        patterns: Patrones de archivos a monitorear (glob)
        duration_seconds: Tiempo de monitoreo
    """
    event_queue: asyncio.Queue[FileEvent] = asyncio.Queue()
    handler = AgentFileWatcher(event_queue, patterns)

    observer = Observer()
    observer.schedule(handler, watch_dir, recursive=True)
    observer.start()

    print(f"[Reactive] Monitoreando {watch_dir} por {duration_seconds}s...")

    try:
        deadline = time.time() + duration_seconds
        while time.time() < deadline:
            try:
                event = await asyncio.wait_for(event_queue.get(), timeout=1.0)
                print(f"[Reactive] Evento: {event.event_type} en {event.path}")
                result = await file_processor_agent(event, watch_dir)
                print(f"[Reactive] Análisis: {result[:200]}")
            except asyncio.TimeoutError:
                continue  # No hay eventos, seguir esperando
    finally:
        observer.stop()
        observer.join()
        print("[Reactive] Monitoreo terminado")


# ─── Watchdog Pattern: Agente que supervisa a otro agente ──────

class AgentWatchdog:
    """
    Supervisore de agentes: si el agente supervisado no produce
    output en X segundos, lo considera colgado y lo reinicia.
    """

    def __init__(self, timeout_seconds: float = 60.0, max_restarts: int = 3):
        self.timeout_seconds = timeout_seconds
        self.max_restarts = max_restarts
        self._last_heartbeat = time.time()
        self._restart_count = 0

    def heartbeat(self) -> None:
        """El agente supervisado debe llamar a esto periódicamente."""
        self._last_heartbeat = time.time()

    def is_alive(self) -> bool:
        """True si el agente respondió recientemente."""
        return time.time() - self._last_heartbeat < self.timeout_seconds

    async def watch(self, agent_coro, task_description: str) -> str:
        """
        Supervisa una corutina de agente con timeout y reintentos.
        """
        for attempt in range(self.max_restarts + 1):
            try:
                result = await asyncio.wait_for(
                    agent_coro(task_description),
                    timeout=self.timeout_seconds
                )
                return result
            except asyncio.TimeoutError:
                self._restart_count += 1
                print(f"[Watchdog] Timeout en intento {attempt + 1}/{self.max_restarts + 1}")
                if attempt < self.max_restarts:
                    print(f"[Watchdog] Reiniciando agente...")
                    await asyncio.sleep(2.0)
                    continue
                raise RuntimeError(f"Agente no respondió después de {self.max_restarts + 1} intentos")


# ─── Agente que monitorea logs en tiempo real ─────────────────

async def log_monitor_agent(log_file: str, check_interval: float = 5.0) -> None:
    """
    Lee un archivo de log en tiempo real y actúa cuando detecta errores.
    Similar a `tail -f` pero con inteligencia del agente.
    """
    last_size = 0

    print(f"[LogMonitor] Monitoreando {log_file}...")

    while True:
        await asyncio.sleep(check_interval)

        try:
            current_size = os.path.getsize(log_file)
        except FileNotFoundError:
            continue

        if current_size <= last_size:
            continue

        # Hay nuevas líneas — leerlas
        with open(log_file) as f:
            f.seek(last_size)
            new_content = f.read()

        last_size = current_size

        # Verificar si hay errores en las nuevas líneas
        if "ERROR" in new_content or "CRITICAL" in new_content:
            print(f"[LogMonitor] Error detectado en logs, analizando...")

            texts: list[str] = []
            async for message in query(
                prompt=f"""
                Analiza estas líneas de log que contienen errores:
                {new_content[-2000:]}

                Determina:
                1. Tipo de error
                2. Causa probable
                3. Acción recomendada (¿alerta? ¿restart? ¿ignorar?)
                """,
                options=ClaudeCodeOptions(
                    allowed_tools=[],
                    model="claude-haiku-4-5",
                    max_turns=3
                )
            ):
                if hasattr(message, "content") and message.content:
                    texts.append(str(message.content))

            analysis = "\n".join(texts)
            print(f"[LogMonitor] Análisis: {analysis[:300]}")

15. Federación de Agentes

Múltiples Instancias del SDK en Diferentes Máquinas

La federación permite distribuir trabajo de agentes entre múltiples máquinas, coordinadas por un orquestador central. Es útil cuando la carga de trabajo supera lo que una instancia puede manejar.

# federation.py
import asyncio
import httpx
import uuid
from dataclasses import dataclass
from typing import Any
from claude_code_sdk import query, ClaudeCodeOptions


@dataclass
class AgentNode:
    """Representa un nodo de agente en el cluster."""
    node_id: str
    host: str
    port: int
    capabilities: list[str]
    current_load: int = 0

    @property
    def base_url(self) -> str:
        return f"http://{self.host}:{self.port}"


class AgentCluster:
    """
    Cluster de agentes distribuidos.
    Cada nodo expone una API REST para recibir tareas.
    """

    def __init__(self, nodes: list[AgentNode]):
        self.nodes = nodes

    def select_node(self, required_capabilities: list[str] | None = None) -> AgentNode | None:
        """Selecciona el nodo disponible más adecuado."""
        candidates = self.nodes

        if required_capabilities:
            candidates = [
                n for n in candidates
                if all(cap in n.capabilities for cap in required_capabilities)
            ]

        if not candidates:
            return None

        # Elegir el nodo con menor carga
        return min(candidates, key=lambda n: n.current_load)

    async def dispatch_task(
        self,
        task: str,
        required_capabilities: list[str] | None = None,
        timeout: float = 120.0
    ) -> dict[str, Any]:
        """Despacha una tarea al nodo más adecuado."""
        node = self.select_node(required_capabilities)
        if not node:
            raise RuntimeError(f"No hay nodos disponibles para: {required_capabilities}")

        node.current_load += 1
        task_id = str(uuid.uuid4())[:8]

        try:
            async with httpx.AsyncClient(timeout=timeout) as client:
                response = await client.post(
                    f"{node.base_url}/tasks",
                    json={"task_id": task_id, "prompt": task}
                )
                response.raise_for_status()
                return response.json()
        finally:
            node.current_load = max(0, node.current_load - 1)

    async def broadcast_task(
        self,
        task: str,
        aggregation: str = "first_success"
    ) -> list[dict[str, Any]]:
        """
        Envía la misma tarea a todos los nodos.
        Útil para ensemble voting distribuido.
        """
        tasks = [self.dispatch_task(task) for _ in self.nodes]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return [r for r in results if isinstance(r, dict)]


# ─── Servidor de agente (FastAPI) ─────────────────────────────
# Este sería el código que corre en cada nodo del cluster

AGENT_SERVER_CODE = '''
# agent_server.py (corre en cada nodo)
from fastapi import FastAPI
from pydantic import BaseModel
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio

app = FastAPI()

class TaskRequest(BaseModel):
    task_id: str
    prompt: str
    tools: list[str] = ["Read", "Bash"]
    max_turns: int = 20

@app.post("/tasks")
async def process_task(request: TaskRequest):
    texts = []
    async for message in query(
        prompt=request.prompt,
        options=ClaudeCodeOptions(
            allowed_tools=request.tools,
            max_turns=request.max_turns
        )
    ):
        if hasattr(message, "content") and message.content:
            texts.append(str(message.content))

    return {
        "task_id": request.task_id,
        "output": "\\n".join(texts),
        "success": True
    }

@app.get("/health")
async def health():
    return {"status": "ok", "node_id": "node-1"}
'''


# Ejemplo de uso del cluster
async def main():
    # Definir el cluster (en prod, estos valores vendrían de service discovery)
    nodes = [
        AgentNode("node-1", "agent-1.internal", 8001, ["python", "review"]),
        AgentNode("node-2", "agent-2.internal", 8002, ["python", "typescript", "review"]),
        AgentNode("node-3", "agent-3.internal", 8003, ["security", "review"]),
    ]

    cluster = AgentCluster(nodes)

    # Despachar tarea a nodo con capacidad de TypeScript
    result = await cluster.dispatch_task(
        task="Revisa el código TypeScript en src/ y reporta problemas de tipos.",
        required_capabilities=["typescript"]
    )
    print(f"Resultado del nodo: {result}")

16. Agentes con Memoria a Largo Plazo

Memoria Episódica y Semántica con Vector Database

La memoria a largo plazo permite que un agente recuerde decisiones pasadas, aprendizajes del proyecto y contexto acumulado. Esto es fundamental para agentes que trabajan en proyectos a lo largo del tiempo.

flowchart TB
    Query["Nueva tarea del agente"] --> Retrieval["Retrieval\n(búsqueda semántica)"]

    subgraph VectorDB["Vector Database (ChromaDB / Qdrant)"]
        EpisodicMem["Memoria Episódica\n(conversaciones pasadas)"]
        SemanticMem["Memoria Semántica\n(conocimiento del dominio)"]
        ProceduralMem["Memoria Procedimental\n(cómo hacer las cosas)"]
    end

    Retrieval --> EpisodicMem
    Retrieval --> SemanticMem
    Retrieval --> ProceduralMem

    EpisodicMem --> |top-k relevantes| Context["Contexto enriquecido"]
    SemanticMem --> |top-k relevantes| Context
    ProceduralMem --> |top-k relevantes| Context

    Context --> Agent["Agente con contexto\nde memoria"]
    Agent --> Result["Resultado"]
    Result --> |extrae nuevo conocimiento| Storage["Almacenamiento\nen Vector DB"]
    Storage --> VectorDB

Implementación con ChromaDB

# long_term_memory.py
import asyncio
import json
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from claude_code_sdk import query, ClaudeCodeOptions

# Para producción: pip install chromadb
# Aquí usamos una implementación simplificada para el ejemplo
try:
    import chromadb
    CHROMADB_AVAILABLE = True
except ImportError:
    CHROMADB_AVAILABLE = False


@dataclass
class MemoryEntry:
    content: str
    memory_type: str  # "episodic", "semantic", "procedural"
    metadata: dict = field(default_factory=dict)
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())


class AgentMemoryStore:
    """
    Almacén de memoria con búsqueda semántica.
    Usa ChromaDB si está disponible, fallback a búsqueda por keywords.
    """

    def __init__(self, project_id: str, persist_dir: str = "/tmp/agent_memory"):
        self.project_id = project_id
        self.persist_dir = persist_dir
        self._entries: list[MemoryEntry] = []

        if CHROMADB_AVAILABLE:
            self._client = chromadb.PersistentClient(path=persist_dir)
            self._collection = self._client.get_or_create_collection(
                name=f"project_{project_id}",
                metadata={"hnsw:space": "cosine"}
            )
        else:
            print("[Memory] ChromaDB no disponible, usando búsqueda por keywords")

    def store_episodic(
        self,
        conversation_summary: str,
        task: str,
        outcome: str,
        tags: list[str] | None = None
    ) -> str:
        """Almacena un recuerdo episódico (resumen de conversación)."""
        content = f"TAREA: {task}\nRESULTADO: {outcome}\nRESUMEN: {conversation_summary}"
        entry = MemoryEntry(
            content=content,
            memory_type="episodic",
            metadata={"task": task[:100], "tags": tags or []}
        )
        return self._store_entry(entry)

    def store_semantic(
        self,
        knowledge: str,
        category: str,
        tags: list[str] | None = None
    ) -> str:
        """Almacena conocimiento semántico (hechos del dominio)."""
        entry = MemoryEntry(
            content=knowledge,
            memory_type="semantic",
            metadata={"category": category, "tags": tags or []}
        )
        return self._store_entry(entry)

    def store_procedural(
        self,
        procedure: str,
        use_case: str,
        tags: list[str] | None = None
    ) -> str:
        """Almacena conocimiento procedimental (cómo hacer algo)."""
        content = f"CASO DE USO: {use_case}\nPROCEDIMIENTO: {procedure}"
        entry = MemoryEntry(
            content=content,
            memory_type="procedural",
            metadata={"use_case": use_case[:100], "tags": tags or []}
        )
        return self._store_entry(entry)

    def _store_entry(self, entry: MemoryEntry) -> str:
        """Almacena una entrada en la memoria."""
        entry_id = f"{entry.memory_type}_{len(self._entries)}"
        self._entries.append(entry)

        if CHROMADB_AVAILABLE:
            self._collection.add(
                documents=[entry.content],
                metadatas=[{**entry.metadata, "type": entry.memory_type, "timestamp": entry.timestamp}],
                ids=[entry_id]
            )

        return entry_id

    def retrieve(
        self,
        query_text: str,
        top_k: int = 5,
        memory_type: str | None = None
    ) -> list[MemoryEntry]:
        """
        Recupera las memorias más relevantes para la query.
        Usa búsqueda semántica con ChromaDB o keyword search como fallback.
        """
        if CHROMADB_AVAILABLE:
            where = {"type": memory_type} if memory_type else None
            results = self._collection.query(
                query_texts=[query_text],
                n_results=top_k,
                where=where
            )
            entries = []
            for i, doc in enumerate(results["documents"][0]):
                meta = results["metadatas"][0][i]
                entries.append(MemoryEntry(
                    content=doc,
                    memory_type=meta.get("type", "unknown"),
                    metadata=meta
                ))
            return entries
        else:
            # Búsqueda por keywords como fallback
            query_words = set(query_text.lower().split())
            scored = []
            for entry in self._entries:
                if memory_type and entry.memory_type != memory_type:
                    continue
                doc_words = set(entry.content.lower().split())
                score = len(query_words & doc_words) / max(len(query_words), 1)
                if score > 0:
                    scored.append((score, entry))
            scored.sort(key=lambda x: x[0], reverse=True)
            return [entry for _, entry in scored[:top_k]]

    def build_memory_context(self, query_text: str, max_chars: int = 2000) -> str:
        """Construye un bloque de contexto de memoria para incluir en el prompt."""
        relevant = self.retrieve(query_text, top_k=5)
        if not relevant:
            return ""

        lines = ["=== MEMORIA RELEVANTE DEL PROYECTO ==="]
        total_chars = 0
        for entry in relevant:
            content_preview = entry.content[:400]
            if total_chars + len(content_preview) > max_chars:
                break
            lines.append(f"\n[{entry.memory_type.upper()}] {entry.metadata.get('timestamp', '')[:10]}")
            lines.append(content_preview)
            total_chars += len(content_preview)

        return "\n".join(lines)


async def agent_with_long_term_memory(
    task: str,
    memory: AgentMemoryStore,
    tools: list[str] | None = None
) -> str:
    """
    Agente que usa memoria a largo plazo.

    Antes de cada query, recupera memorias relevantes.
    Después de cada query, extrae y almacena nuevo conocimiento.
    """
    if tools is None:
        tools = ["Read", "Bash"]

    # Recuperar memoria relevante
    memory_context = memory.build_memory_context(task)

    prompt = f"""
{memory_context}

Tarea actual: {task}

Notas:
- Aplica el conocimiento previo si es relevante
- Si tomas decisiones de arquitectura o diseño, explica el razonamiento
- Al final de tu respuesta, incluye una sección "=== NUEVO CONOCIMIENTO ===" con
  cualquier aprendizaje nuevo que debería recordarse para el futuro
"""

    texts: list[str] = []
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
    ):
        if hasattr(message, "content") and message.content:
            texts.append(str(message.content))

    full_output = "\n".join(texts)

    # Extraer y almacenar nuevo conocimiento
    if "=== NUEVO CONOCIMIENTO ===" in full_output:
        knowledge_section = full_output.split("=== NUEVO CONOCIMIENTO ===")[1].strip()
        if knowledge_section:
            # Determinar tipo de conocimiento
            if any(word in task.lower() for word in ["cómo", "pasos", "procedimiento", "proceso"]):
                memory.store_procedural(
                    procedure=knowledge_section[:500],
                    use_case=task[:100],
                    tags=task.split()[:5]
                )
            else:
                memory.store_episodic(
                    conversation_summary=knowledge_section[:500],
                    task=task[:100],
                    outcome="completado",
                    tags=task.split()[:5]
                )
            print("[Memory] Nuevo conocimiento almacenado")

    return full_output


# Ejemplo completo: agente de desarrollo con memoria persistente
async def development_agent_session():
    """
    Simulación de sesión de trabajo con memoria persistente.
    El agente recuerda decisiones de sesiones anteriores.
    """
    memory = AgentMemoryStore(project_id="mi_proyecto")

    # Pre-cargar conocimiento del proyecto
    memory.store_semantic(
        knowledge="El proyecto usa FastAPI con SQLAlchemy. Las rutas están en src/api/routes/. Los modelos en src/models/.",
        category="arquitectura",
        tags=["fastapi", "sqlalchemy", "estructura"]
    )
    memory.store_semantic(
        knowledge="Convención: todas las funciones deben tener type hints y docstrings. Tests con pytest-asyncio.",
        category="convenciones",
        tags=["convenciones", "testing"]
    )
    memory.store_procedural(
        procedure="Para agregar una nueva ruta: 1) Crear endpoint en src/api/routes/{nombre}.py, 2) Registrar en src/api/__init__.py, 3) Agregar tests en tests/test_{nombre}.py",
        use_case="agregar nueva ruta a la API",
        tags=["ruta", "api", "nueva funcionalidad"]
    )

    # Sesión de trabajo
    tasks = [
        "Agrega un endpoint GET /users/{user_id}/profile que retorne el perfil del usuario",
        "¿Cuál es la convención de naming para los tests en este proyecto?",
        "Crea el test para el endpoint que acabas de crear",
    ]

    for task in tasks:
        print(f"\n{'='*60}")
        print(f"TAREA: {task}")
        print('='*60)
        result = await agent_with_long_term_memory(task, memory)
        print(f"RESULTADO: {result[:400]}...")


if __name__ == "__main__":
    asyncio.run(development_agent_session())

TypeScript: Implementación con Qdrant

// long-term-memory.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

// En producción: npm install @qdrant/js-client-rest
// Aquí simulamos la interfaz

interface MemoryEntry {
  id: string;
  content: string;
  type: "episodic" | "semantic" | "procedural";
  tags: string[];
  timestamp: string;
  score?: number;
}

class KeywordMemoryStore {
  private entries: MemoryEntry[] = [];
  private counter = 0;

  store(entry: Omit<MemoryEntry, "id" | "timestamp">): string {
    const id = `${entry.type}_${++this.counter}`;
    this.entries.push({
      ...entry,
      id,
      timestamp: new Date().toISOString(),
    });
    return id;
  }

  retrieve(queryText: string, topK = 5, type?: string): MemoryEntry[] {
    const queryWords = new Set(queryText.toLowerCase().split(/\s+/));
    const scored = this.entries
      .filter((e) => !type || e.type === type)
      .map((entry) => {
        const docWords = new Set(
          (entry.content + " " + entry.tags.join(" ")).toLowerCase().split(/\s+/)
        );
        const intersection = [...queryWords].filter((w) => docWords.has(w));
        const score = intersection.length / Math.max(queryWords.size, 1);
        return { ...entry, score };
      })
      .filter((e) => (e.score ?? 0) > 0)
      .sort((a, b) => (b.score ?? 0) - (a.score ?? 0))
      .slice(0, topK);

    return scored;
  }

  buildContext(queryText: string, maxChars = 2000): string {
    const relevant = this.retrieve(queryText);
    if (!relevant.length) return "";

    const lines = ["=== MEMORIA RELEVANTE ==="];
    let totalChars = 0;
    for (const entry of relevant) {
      const preview = entry.content.slice(0, 400);
      if (totalChars + preview.length > maxChars) break;
      lines.push(`\n[${entry.type.toUpperCase()}] ${entry.timestamp.slice(0, 10)}`);
      lines.push(preview);
      totalChars += preview.length;
    }
    return lines.join("\n");
  }
}

async function agentWithLongTermMemory(
  task: string,
  memory: KeywordMemoryStore,
  tools: string[] = ["Read", "Bash"]
): Promise<string> {
  const memoryContext = memory.buildContext(task);

  const prompt = `${memoryContext}\n\nTarea: ${task}\n\nSi descubres conocimiento nuevo útil, inclúyelo en "=== NUEVO CONOCIMIENTO ===".`;

  const texts: string[] = [];
  for await (const message of query(prompt, {
    allowedTools: tools,
    maxTurns: 20,
  } as ClaudeCodeOptions)) {
    if ("content" in message && message.content) {
      texts.push(String(message.content));
    }
  }

  const output = texts.join("\n");

  if (output.includes("=== NUEVO CONOCIMIENTO ===")) {
    const knowledge = output.split("=== NUEVO CONOCIMIENTO ===")[1].trim();
    if (knowledge) {
      memory.store({
        content: knowledge.slice(0, 500),
        type: "episodic",
        tags: task.split(/\s+/).slice(0, 5),
      });
      console.log("[Memory] Nuevo conocimiento almacenado");
    }
  }

  return output;
}

export { KeywordMemoryStore, agentWithLongTermMemory };

Resumen Expandido del Capítulo

flowchart TB
    subgraph Calidad["Para mejorar calidad"]
        Reflection["Reflection\nGenerar → Criticar → Mejorar"]
        CriticActor["Critic-Actor Profundo\nRúbrica con pesos"]
        CrossVerify["Verificación Cruzada\nGenerador + Verificador"]
        Constitutional["Constitutional AI\nAuto-auditoría de principios"]
        Ensemble["Ensemble\nMúltiples → Voting"]
    end

    subgraph Decision["Para mejores decisiones"]
        Debate["Debate\nPro vs Contra + Árbitro"]
        Router["Routing\nClasificar → Especialista"]
        Speculative["Speculative Execution\nRace de estrategias"]
    end

    subgraph Learning["Para aprender y adaptarse"]
        SelfImprove["Self-Improvement\nAprender de errores"]
        SharedMem["Memoria Compartida\nConocimiento acumulado"]
        LongTerm["Memoria a Largo Plazo\nVector DB + Retrieval"]
    end

    subgraph Coordination["Para coordinación flexible"]
        Artifacts["Artefactos\nComunicación por archivos"]
        Reactive["Agentes Reactivos\nEvent-driven"]
        Federation["Federación\nCluster distribuido"]
        HumanSuper["Supervisión Humana\nAprobación escalable"]
    end

Guía de selección de patrón:

SituaciónPatrón recomendado
Documentación técnica que debe ser excelenteCritic-Actor Profundo con rúbrica
Código que debe cumplir políticas de seguridadConstitutional AI
Decisiones de arquitectura con trade-offsDebate
Múltiples enfoques, usar el más rápidoSpeculative Execution
Archivos que se crean automáticamenteAgentes Reactivos
Proyecto con historia de decisionesMemoria a Largo Plazo
Alta carga de trabajo de reviewFederación de Agentes
Aprobación de acciones irreversiblesSupervisión Humana

Próximo capítulo: Capítulo 20: SDK Internals y Extensiones — Cómo funciona el SDK por dentro y cómo extenderlo.