Capítulo 19: Patrones Multi-Agente Avanzados
Capítulo 19: Patrones Multi-Agente Avanzados
Los sistemas multi-agente simples (hub-and-spoke, pipeline, paralelo) tienen límites bien definidos. Para problemas que requieren razonamiento profundo, validación rigurosa o especialización extrema, necesitamos patrones más sofisticados. Este capítulo cubre los patrones avanzados que emergen cuando múltiples agentes colaboran de formas no triviales.
1. Más Allá del Hub-and-Spoke
Limitaciones de los Patrones Básicos
Los patrones del capítulo anterior — orquestador central con subagentes — tienen limitaciones importantes:
flowchart TB
subgraph Basico["Patrón Básico (Hub-and-Spoke)"]
O["Orquestador"] --> A1["Agente 1"]
O --> A2["Agente 2"]
O --> A3["Agente 3"]
end
subgraph Limitaciones["Limitaciones"]
L1["Sin verificación\nindependiente"]
L2["Sin mejora\niterativa"]
L3["Sin perspectivas\ncontradictorias"]
L4["Sin especialización\nreal"]
end
Basico -.-> Limitaciones
¿Cuándo necesitas patrones más complejos?
| Situación | Patrón recomendado |
|---|---|
| El código generado debe ser correcto, no solo plausible | Reflection o Verificación cruzada |
| Decisiones de arquitectura con trade-offs importantes | Debate |
| Múltiples enfoques posibles, necesitas el mejor | Ensemble con voting |
| El agente comete errores sistemáticos | Self-improvement loop |
| Tareas muy distintas requieren expertise especializado | Routing con especialistas |
| Agentes en paralelo sin coordinador explícito | Comunicación por artefactos |
| El humano debe aprobar decisiones críticas | Supervisión humana escalable |
| Los agentes deben recordar conocimiento acumulado | Memoria compartida con vector DB |
Taxonomía de Patrones Multi-Agente
flowchart TB
Patrones["Patrones Multi-Agente"] --> Iterativos["Iterativos\n(mejora en ciclos)"]
Patrones --> Colaborativos["Colaborativos\n(múltiples perspectivas)"]
Patrones --> Especializados["Especializados\n(routing por capacidad)"]
Patrones --> Distribuidos["Distribuidos\n(sin coordinador central)"]
Patrones --> Adaptativos["Adaptativos\n(aprenden de errores)"]
Iterativos --> Reflection["Reflection"]
Iterativos --> SelfImprovement["Self-Improvement"]
Colaborativos --> Debate["Debate"]
Colaborativos --> Ensemble["Ensemble + Voting"]
Colaborativos --> CrossVerification["Verificación Cruzada"]
Especializados --> Router["Router + Especialistas"]
Distribuidos --> ArtifactComm["Comunicación por Artefactos"]
Adaptativos --> SharedMemory["Memoria Compartida"]
2. Patrón Reflection
Concepto: Generar → Criticar → Mejorar
El patrón Reflection es la forma más poderosa de mejorar la calidad de outputs cuando la calidad importa más que la velocidad. Un agente genera, otro critica con rigor, y el primero mejora basado en la crítica.
sequenceDiagram
participant User
participant Generator as Agente Generador
participant Critic as Agente Crítico
participant Judge as Juez de Calidad
User->>Generator: Tarea inicial
loop Ciclo de Reflexión (N iteraciones)
Generator->>Generator: Genera solución v(N)
Generator->>Critic: Entrega solución v(N)
Critic->>Critic: Analiza fallas, inconsistencias
Critic->>Generator: Critica detallada
Generator->>Generator: Mejora basada en crítica
Generator->>Judge: Solución mejorada v(N+1)
Judge->>Judge: ¿Calidad suficiente?
Judge-->>Generator: No suficiente → continuar
Judge-->>User: Suficiente → retornar resultado
end
Implementación Python
# reflection_pattern.py
import asyncio
from dataclasses import dataclass
from typing import AsyncGenerator
from claude_code_sdk import query, ClaudeCodeOptions
@dataclass
class ReflectionConfig:
max_iterations: int = 3
quality_threshold: float = 0.8
generator_tools: list[str] = None
critic_tools: list[str] = None
def __post_init__(self):
if self.generator_tools is None:
self.generator_tools = ["Read", "Edit", "Bash"]
if self.critic_tools is None:
self.critic_tools = ["Read", "Bash"]
async def _run_agent(prompt: str, tools: list[str], max_turns: int = 30) -> str:
"""Ejecuta un agente y retorna el resultado como string."""
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=tools,
max_turns=max_turns
)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
async def reflection_pattern(
task: str,
config: ReflectionConfig | None = None
) -> str:
"""
Patrón Reflection: genera, critica y mejora iterativamente.
Args:
task: Descripción de la tarea a resolver
config: Configuración del ciclo de reflexión
Returns:
Solución mejorada tras N iteraciones de reflexión
"""
if config is None:
config = ReflectionConfig()
print(f"[Reflection] Iniciando con tarea: {task[:80]}...")
# Paso 1: Generación inicial
generator_prompt = f"""
Tarea: {task}
Produce una solución completa y detallada. No omitas pasos importantes.
Piensa paso a paso antes de escribir la solución.
"""
current_solution = await _run_agent(
generator_prompt,
config.generator_tools
)
print(f"[Reflection] Solución inicial generada ({len(current_solution)} chars)")
# Ciclo de reflexión
for iteration in range(config.max_iterations):
print(f"[Reflection] Iteración {iteration + 1}/{config.max_iterations}")
# Paso 2: Crítica
critic_prompt = f"""
Tarea original: {task}
Solución propuesta:
{current_solution}
Tu rol es ser un CRÍTICO RIGUROSO. Analiza la solución con estos criterios:
1. Correctitud: ¿Es técnicamente correcta? ¿Hay bugs o errores lógicos?
2. Completitud: ¿Cubre todos los casos de la tarea? ¿Hay edge cases faltantes?
3. Calidad: ¿Sigue buenas prácticas? ¿Es mantenible y legible?
4. Seguridad: ¿Hay vulnerabilidades o problemas de seguridad?
5. Performance: ¿Hay problemas de rendimiento obvios?
Para CADA problema encontrado:
- Describe el problema específicamente (no en general)
- Indica la línea o sección afectada
- Sugiere cómo corregirlo
Si la solución es excelente, di "APROBADO" claramente.
Si hay problemas menores, di "APROBADO CON MEJORAS" y lista las mejoras.
Si hay problemas serios, di "RECHAZADO" y explica qué debe cambiar.
"""
critique = await _run_agent(
critic_prompt,
config.critic_tools,
max_turns=15
)
print(f"[Reflection] Crítica: {critique[:200]}...")
# Verificar si la calidad es suficiente
is_approved = (
"APROBADO" in critique.upper() and
"RECHAZADO" not in critique.upper()
)
if is_approved:
print(f"[Reflection] Aprobado en iteración {iteration + 1}")
break
# Paso 3: Mejora basada en crítica
improve_prompt = f"""
Tarea original: {task}
Tu solución anterior:
{current_solution}
Crítica recibida:
{critique}
Mejora tu solución atendiendo TODOS los puntos de la crítica.
Mantén los aspectos que el crítico consideró correctos.
Para cada punto criticado, aplica la corrección sugerida.
Produce la versión mejorada completa (no solo los cambios).
"""
current_solution = await _run_agent(
improve_prompt,
config.generator_tools
)
print(f"[Reflection] Solución mejorada ({len(current_solution)} chars)")
return current_solution
# ============================================================
# Ejemplo: Reflection para código crítico
# ============================================================
async def main():
task = """
Escribe una función Python que parsee y valide JWTs.
Debe verificar la firma, expiración, y claims obligatorios.
Debe manejar todos los casos de error correctamente.
"""
config = ReflectionConfig(
max_iterations=3,
generator_tools=["Bash"], # Solo Bash para generar código
critic_tools=["Bash"]
)
result = await reflection_pattern(task, config)
print("\n=== RESULTADO FINAL ===")
print(result)
if __name__ == "__main__":
asyncio.run(main())
Implementación TypeScript
// reflection-pattern.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
interface ReflectionConfig {
maxIterations: number;
generatorTools: string[];
criticTools: string[];
}
const DEFAULT_CONFIG: ReflectionConfig = {
maxIterations: 3,
generatorTools: ["Read", "Edit", "Bash"],
criticTools: ["Read", "Bash"],
};
async function runAgent(
prompt: string,
tools: string[],
maxTurns = 30
): Promise<string> {
const results: string[] = [];
for await (const message of query(prompt, {
allowedTools: tools,
maxTurns,
} as ClaudeCodeOptions)) {
if ("content" in message && message.content) {
results.push(String(message.content));
}
}
return results.join("\n");
}
async function reflectionPattern(
task: string,
config: ReflectionConfig = DEFAULT_CONFIG
): Promise<string> {
console.log(`[Reflection] Tarea: ${task.slice(0, 80)}...`);
// Generación inicial
let currentSolution = await runAgent(
`Tarea: ${task}\n\nProduce una solución completa y detallada.`,
config.generatorTools
);
for (let i = 0; i < config.maxIterations; i++) {
console.log(`[Reflection] Iteración ${i + 1}/${config.maxIterations}`);
// Crítica
const critique = await runAgent(
`Tarea: ${task}\n\nSolución propuesta:\n${currentSolution}\n\n` +
`Analiza críticamente. Marca APROBADO o RECHAZADO.`,
config.criticTools,
15
);
const approved =
critique.toUpperCase().includes("APROBADO") &&
!critique.toUpperCase().includes("RECHAZADO");
if (approved) {
console.log(`[Reflection] Aprobado en iteración ${i + 1}`);
break;
}
// Mejora
currentSolution = await runAgent(
`Tarea: ${task}\n\nSolución anterior:\n${currentSolution}\n\n` +
`Crítica:\n${critique}\n\nMejora la solución atendiendo todos los puntos.`,
config.generatorTools
);
}
return currentSolution;
}
export { reflectionPattern, ReflectionConfig };
3. Patrón Debate
Múltiples Agentes Argumentando Posiciones
El patrón Debate es ideal para decisiones que tienen trade-offs reales donde no hay una respuesta objetivamente correcta: elección de tecnología, decisiones de arquitectura, estrategias de producto.
flowchart TD
Question["Pregunta / Decisión"] --> Pro["Agente PRO\n(defiende opción A)"]
Question --> Con["Agente CONTRA\n(defiende opción B)"]
Pro --> |"Argumento 1"| Round1["Ronda 1"]
Con --> |"Contraargumento 1"| Round1
Round1 --> |"Rebatir"| Pro
Round1 --> |"Rebatir"| Con
Pro --> |"Argumento 2"| Round2["Ronda 2"]
Con --> |"Contraargumento 2"| Round2
Round2 --> Arbitro["Árbitro\n(analiza ambas posiciones)"]
Arbitro --> Decision["Decisión final\ncon justificación"]
Implementación Python
# debate_pattern.py
import asyncio
from dataclasses import dataclass
from claude_code_sdk import query, ClaudeCodeOptions
@dataclass
class DebateConfig:
rounds: int = 2
tools: list[str] = None
def __post_init__(self):
if self.tools is None:
self.tools = ["Bash"] # Solo lectura/análisis, no modificar archivos
async def _agent_respond(prompt: str, tools: list[str]) -> str:
"""Respuesta de un agente."""
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=10)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
async def debate_pattern(
question: str,
option_a: str,
option_b: str,
config: DebateConfig | None = None
) -> dict:
"""
Debate entre dos posiciones con árbitro final.
Returns:
dict con 'winner', 'reasoning', 'transcript'
"""
if config is None:
config = DebateConfig()
transcript = []
# Posiciones iniciales
pro_prompt = f"""
Eres el DEFENSOR de la opción: "{option_a}".
Debes argumentar a favor de esta opción frente a "{option_b}".
La pregunta es: {question}
Presenta tus 3 mejores argumentos. Sé específico con datos y ejemplos reales.
No seas genérico. Conoce las debilidades de tu posición y abórdalas proactivamente.
"""
con_prompt = f"""
Eres el DEFENSOR de la opción: "{option_b}".
Debes argumentar a favor de esta opción frente a "{option_a}".
La pregunta es: {question}
Presenta tus 3 mejores argumentos. Sé específico con datos y ejemplos reales.
No seas genérico. Conoce las debilidades de tu posición y abórdalas proactivamente.
"""
pro_position = await _agent_respond(pro_prompt, config.tools)
con_position = await _agent_respond(con_prompt, config.tools)
transcript.append({"round": 0, "pro": pro_position, "con": con_position})
# Rondas de debate
for round_num in range(1, config.rounds + 1):
pro_rebut_prompt = f"""
Contexto: Debate sobre "{question}"
Tu posición: {option_a}
Tu argumento anterior: {transcript[-1]['pro']}
El oponente respondió:
{transcript[-1]['con']}
REBATE sus argumentos. Para cada punto del oponente:
1. Reconoce si tiene algo de razón (honestidad intelectual)
2. Explica por qué tu posición sigue siendo superior
3. Añade un argumento nuevo que el oponente no consideró
"""
con_rebut_prompt = f"""
Contexto: Debate sobre "{question}"
Tu posición: {option_b}
Tu argumento anterior: {transcript[-1]['con']}
El oponente respondió:
{transcript[-1]['pro']}
REBATE sus argumentos. Para cada punto del oponente:
1. Reconoce si tiene algo de razón (honestidad intelectual)
2. Explica por qué tu posición sigue siendo superior
3. Añade un argumento nuevo que el oponente no consideró
"""
pro_rebuttal = await _agent_respond(pro_rebut_prompt, config.tools)
con_rebuttal = await _agent_respond(con_rebut_prompt, config.tools)
transcript.append({
"round": round_num,
"pro": pro_rebuttal,
"con": con_rebuttal
})
# Árbitro
debate_summary = "\n\n".join([
f"=== Ronda {t['round']} ===\n"
f"PRO ({option_a}):\n{t['pro']}\n\n"
f"CONTRA ({option_b}):\n{t['con']}"
for t in transcript
])
arbitro_prompt = f"""
Eres un ÁRBITRO imparcial. Has presenciado el siguiente debate:
Pregunta: {question}
{debate_summary}
Tu tarea:
1. Evalúa la calidad de los argumentos de cada lado (no tu preferencia personal)
2. Identifica qué argumentos fueron más sólidos y por qué
3. Determina qué opción ganó el debate basándote en la calidad argumentativa
4. Proporciona una recomendación final con justificación de 3-5 oraciones
Formato de respuesta:
GANADOR: [nombre de la opción]
JUSTIFICACIÓN: [3-5 oraciones explicando el razonamiento]
ARGUMENTOS CLAVE: [los 2-3 argumentos más decisivos]
ADVERTENCIAS: [condiciones bajo las cuales la decisión podría cambiar]
"""
arbitro_result = await _agent_respond(arbitro_prompt, config.tools)
return {
"question": question,
"option_a": option_a,
"option_b": option_b,
"arbitro_decision": arbitro_result,
"transcript": transcript
}
# Ejemplo: Debate de arquitectura
async def main():
result = await debate_pattern(
question="¿Qué base de datos usar para el sistema de sesiones de agentes?",
option_a="PostgreSQL con tabla de sesiones",
option_b="Redis con TTL automático",
config=DebateConfig(rounds=2, tools=["Bash"])
)
print("=== DECISIÓN DEL ÁRBITRO ===")
print(result["arbitro_decision"])
if __name__ == "__main__":
asyncio.run(main())
Debate en TypeScript
// debate-pattern.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
interface DebateRound {
round: number;
pro: string;
con: string;
}
interface DebateResult {
question: string;
optionA: string;
optionB: string;
arbiterDecision: string;
transcript: DebateRound[];
}
async function runDebateAgent(prompt: string): Promise<string> {
const results: string[] = [];
for await (const message of query(prompt, {
allowedTools: ["Bash"],
maxTurns: 10,
} as ClaudeCodeOptions)) {
if ("content" in message && message.content) {
results.push(String(message.content));
}
}
return results.join("\n");
}
async function debatePattern(
question: string,
optionA: string,
optionB: string,
rounds = 2
): Promise<DebateResult> {
const transcript: DebateRound[] = [];
// Ronda inicial
const proArg = await runDebateAgent(
`Defiende "${optionA}" vs "${optionB}" para: ${question}. 3 argumentos específicos.`
);
const conArg = await runDebateAgent(
`Defiende "${optionB}" vs "${optionA}" para: ${question}. 3 argumentos específicos.`
);
transcript.push({ round: 0, pro: proArg, con: conArg });
// Rondas de rebatimiento
for (let r = 1; r <= rounds; r++) {
const prev = transcript[transcript.length - 1];
const [proRebut, conRebut] = await Promise.all([
runDebateAgent(
`Tu posición: ${optionA}. Rebate: "${prev.con}". Añade argumento nuevo.`
),
runDebateAgent(
`Tu posición: ${optionB}. Rebate: "${prev.pro}". Añade argumento nuevo.`
),
]);
transcript.push({ round: r, pro: proRebut, con: conRebut });
}
// Árbitro
const summary = transcript
.map(
(t) =>
`Ronda ${t.round}: PRO: ${t.pro.slice(0, 300)}... CON: ${t.con.slice(0, 300)}...`
)
.join("\n\n");
const arbiterDecision = await runDebateAgent(
`Como árbitro imparcial, evalúa este debate sobre "${question}":\n${summary}\n\n` +
`Indica GANADOR, JUSTIFICACIÓN y ARGUMENTOS CLAVE.`
);
return { question, optionA, optionB, arbiterDecision, transcript };
}
export { debatePattern, DebateResult };
4. Patrón Ensemble
Múltiples Agentes Resolviendo el Mismo Problema
El Ensemble es útil cuando un solo agente puede tomar caminos incorrectos, pero múltiples agentes con los mismos recursos tienen baja probabilidad de cometer el mismo error.
flowchart TD
Task["Tarea"] --> |fork| A1["Agente 1\nSolución A"]
Task --> |fork| A2["Agente 2\nSolución B"]
Task --> |fork| A3["Agente 3\nSolución C"]
A1 --> Aggregator["Agregador"]
A2 --> Aggregator
A3 --> Aggregator
Aggregator --> Vote["Voting\n(mayoría decide)"]
Aggregator --> Combine["Combination\n(mejores partes)"]
Vote --> FinalResult["Resultado Final"]
Combine --> FinalResult
Implementación Python
# ensemble_pattern.py
import asyncio
from collections import Counter
from claude_code_sdk import query, ClaudeCodeOptions
async def _solve_independently(
task: str,
agent_id: int,
tools: list[str]
) -> tuple[int, str]:
"""Un agente del ensemble resuelve el problema."""
prompt = f"""
Tarea: {task}
Eres el Agente {agent_id} de un ensemble. Resuelve de forma independiente.
No asumas que otros agentes están resolviendo esto — da tu mejor solución.
"""
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return agent_id, "\n".join(results)
async def _vote_on_solutions(
task: str,
solutions: list[tuple[int, str]],
tools: list[str]
) -> str:
"""Agente árbitro que elige la mejor solución."""
solutions_text = "\n\n".join([
f"=== Solución del Agente {agent_id} ===\n{solution}"
for agent_id, solution in solutions
])
arbitro_prompt = f"""
Tarea original: {task}
Múltiples agentes propusieron estas soluciones:
{solutions_text}
Evalúa cada solución con estos criterios:
1. Correctitud técnica (40%)
2. Completitud (30%)
3. Calidad del código/explicación (20%)
4. Eficiencia (10%)
Selecciona LA MEJOR solución o, si es posible, combina las partes mejores de cada una.
Explica brevemente por qué seleccionaste/combinaste así.
Entrega la solución final completa.
"""
results = []
async for message in query(
prompt=arbitro_prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=20)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
async def ensemble_pattern(
task: str,
num_agents: int = 3,
tools: list[str] | None = None
) -> str:
"""
Ensemble: múltiples agentes resuelven en paralelo, árbitro elige la mejor.
Args:
task: Tarea a resolver
num_agents: Número de agentes en el ensemble
tools: Herramientas disponibles para los agentes
Returns:
La mejor solución del ensemble
"""
if tools is None:
tools = ["Bash"]
print(f"[Ensemble] Lanzando {num_agents} agentes en paralelo...")
# Ejecutar todos los agentes en paralelo
coroutines = [
_solve_independently(task, i + 1, tools)
for i in range(num_agents)
]
solutions = await asyncio.gather(*coroutines)
print(f"[Ensemble] {len(solutions)} soluciones recibidas, votando...")
# Agregar y seleccionar la mejor
best = await _vote_on_solutions(task, list(solutions), tools)
print(f"[Ensemble] Solución final seleccionada ({len(best)} chars)")
return best
# Ensemble con voting simple para respuestas cortas
async def ensemble_voting(
question: str,
num_agents: int = 5
) -> str:
"""
Ensemble con voting por mayoría para respuestas cortas (clasificación, etc.)
"""
async def single_answer(agent_id: int) -> str:
results = []
async for message in query(
prompt=f"{question}\n\nResponde en UNA sola línea con solo la respuesta, sin explicación.",
options=ClaudeCodeOptions(allowed_tools=[], max_turns=3)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results).strip()
answers = await asyncio.gather(*[
single_answer(i) for i in range(num_agents)
])
# Voting por mayoría
vote_count = Counter(
answer.lower().strip() for answer in answers if answer
)
winner, count = vote_count.most_common(1)[0]
print(f"[Ensemble Voting] Votos: {dict(vote_count)}")
print(f"[Ensemble Voting] Ganador: '{winner}' con {count}/{num_agents} votos")
return winner
5. Verificación Cruzada
Agente Generador + Agente Verificador
La verificación cruzada es fundamental cuando el costo de un error es alto: código de producción, migraciones de datos, configuración de seguridad.
flowchart LR
Task["Tarea"] --> Generator["Agente Generador\n(produce código/output)"]
Generator --> Output["Output v1"]
Output --> Verifier["Agente Verificador\n(ejecuta tests, revisa)"]
Verifier --> |"✓ Pasa verificación"| Result["Resultado Final"]
Verifier --> |"✗ Falla verificación\ncon detalles"| Generator
style Result fill:#0a0,color:#fff
style Verifier fill:#55f,color:#fff
Implementación Python
# cross_verification.py
import asyncio
from dataclasses import dataclass
from claude_code_sdk import query, ClaudeCodeOptions
@dataclass
class VerificationResult:
passed: bool
issues: list[str]
suggestions: list[str]
confidence: float # 0.0 a 1.0
async def _generate(task: str, feedback: str | None = None) -> str:
"""Agente generador."""
prompt = task if not feedback else f"""
Tarea original: {task}
Tu output anterior fue rechazado con estas observaciones:
{feedback}
Genera una nueva versión corrigiendo TODOS los problemas señalados.
"""
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=["Read", "Edit", "Bash"],
max_turns=30
)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
async def _verify(task: str, output: str, verification_criteria: list[str]) -> VerificationResult:
"""Agente verificador: ejecuta tests y revisa el output."""
criteria_text = "\n".join(f"- {c}" for c in verification_criteria)
verifier_prompt = f"""
Eres un VERIFICADOR riguroso. Tu trabajo es encontrar problemas, no aprobar fácilmente.
Tarea original: {task}
Output a verificar:
{output}
Criterios de verificación:
{criteria_text}
Para cada criterio:
1. Verifica explícitamente (ejecuta código, lee archivos, etc.)
2. Documenta el resultado: PASA o FALLA con detalles específicos
Si hay código, EJECUTA los tests y reporta los resultados reales.
Responde en este formato exacto:
RESULTADO_GLOBAL: APROBADO|RECHAZADO
CONFIANZA: [0.0-1.0]
CRITERIOS:
- [criterio]: PASA|FALLA - [detalle específico]
ISSUES:
- [lista de problemas específicos, si hay]
SUGERENCIAS:
- [mejoras sugeridas]
"""
results = []
async for message in query(
prompt=verifier_prompt,
options=ClaudeCodeOptions(
allowed_tools=["Read", "Bash"], # Solo lectura y ejecución
max_turns=20
)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
verification_text = "\n".join(results)
# Parsear resultado
passed = "RESULTADO_GLOBAL: APROBADO" in verification_text
issues = []
suggestions = []
confidence = 0.5
for line in verification_text.split("\n"):
line = line.strip()
if line.startswith("- ") and "FALLA" in line:
issues.append(line[2:])
elif line.startswith("- ") and "PASA" not in line and "FALLA" not in line:
if "ISSUES:" in verification_text:
issues.append(line[2:])
if "CONFIANZA:" in line:
try:
confidence = float(line.split(":")[1].strip())
except (ValueError, IndexError):
pass
return VerificationResult(
passed=passed,
issues=issues,
suggestions=suggestions,
confidence=confidence
)
async def cross_verification_pattern(
task: str,
verification_criteria: list[str],
max_attempts: int = 3
) -> tuple[str, list[VerificationResult]]:
"""
Patrón de verificación cruzada.
Returns:
(output_final, historial_de_verificaciones)
"""
verifications = []
last_feedback = None
for attempt in range(1, max_attempts + 1):
print(f"[CrossVerify] Intento {attempt}/{max_attempts}")
# Generar
output = await _generate(task, last_feedback)
# Verificar
result = await _verify(task, output, verification_criteria)
verifications.append(result)
print(f"[CrossVerify] {'✓ Aprobado' if result.passed else '✗ Rechazado'} "
f"(confianza: {result.confidence:.0%})")
if result.passed and result.confidence >= 0.8:
return output, verifications
# Preparar feedback para el siguiente intento
last_feedback = "\n".join([
"Problemas encontrados:",
*[f" - {issue}" for issue in result.issues],
])
print("[CrossVerify] Máximo de intentos alcanzado, retornando mejor resultado")
return output, verifications
6. Self-Improvement Loop
El Agente Aprende de Sus Errores
# self_improvement.py
import asyncio
import json
from dataclasses import dataclass, field, asdict
from claude_code_sdk import query, ClaudeCodeOptions
@dataclass
class AgentMemory:
"""Memoria de sesión del agente con errores y aprendizajes."""
successful_patterns: list[str] = field(default_factory=list)
failed_patterns: list[str] = field(default_factory=list)
learned_rules: list[str] = field(default_factory=list)
iteration_count: int = 0
def to_context_string(self) -> str:
"""Convierte la memoria en texto para incluir en el prompt."""
if not any([self.successful_patterns, self.failed_patterns, self.learned_rules]):
return ""
parts = ["=== Tu historial de aprendizaje en esta sesión ==="]
if self.learned_rules:
parts.append("Reglas aprendidas:")
parts.extend(f" - {r}" for r in self.learned_rules[-5:]) # Últimas 5
if self.failed_patterns:
parts.append("Patrones que han fallado:")
parts.extend(f" - {p}" for p in self.failed_patterns[-3:]) # Últimos 3
if self.successful_patterns:
parts.append("Patrones exitosos:")
parts.extend(f" - {p}" for p in self.successful_patterns[-3:])
return "\n".join(parts)
async def _run_with_memory(
task: str,
memory: AgentMemory,
tools: list[str]
) -> str:
"""Ejecuta el agente con contexto de memoria."""
memory_context = memory.to_context_string()
prompt = f"""
{memory_context}
Tarea actual (intento {memory.iteration_count + 1}): {task}
Aplica lo aprendido de intentos anteriores.
Si encuentras un error, documéntalo claramente para mejorar en el próximo intento.
"""
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
async def _extract_learnings(
task: str,
output: str,
success: bool,
memory: AgentMemory,
tools: list[str]
) -> AgentMemory:
"""Extrae aprendizajes del intento y actualiza la memoria."""
learning_prompt = f"""
Analiza este intento de resolver una tarea:
Tarea: {task}
Output producido: {output[:500]}...
¿Fue exitoso? {success}
Extrae:
1. Si fue exitoso: ¿qué patrones funcionaron bien?
2. Si falló: ¿qué patrones deben evitarse?
3. ¿Qué regla general aprendiste?
Responde en JSON:
{{
"successful_pattern": "descripción breve si fue exitoso, null si no",
"failed_pattern": "descripción breve si falló, null si no",
"learned_rule": "regla general aprendida"
}}
"""
results = []
async for message in query(
prompt=learning_prompt,
options=ClaudeCodeOptions(allowed_tools=[], max_turns=5)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
raw = "\n".join(results)
try:
# Extraer JSON del response
import re
json_match = re.search(r'\{[^}]+\}', raw, re.DOTALL)
if json_match:
learnings = json.loads(json_match.group())
if learnings.get("successful_pattern"):
memory.successful_patterns.append(learnings["successful_pattern"])
if learnings.get("failed_pattern"):
memory.failed_patterns.append(learnings["failed_pattern"])
if learnings.get("learned_rule"):
memory.learned_rules.append(learnings["learned_rule"])
except (json.JSONDecodeError, AttributeError):
pass
memory.iteration_count += 1
return memory
async def self_improvement_loop(
task: str,
success_criteria: str,
max_iterations: int = 5,
tools: list[str] | None = None
) -> tuple[str, AgentMemory]:
"""
Loop de auto-mejora: el agente aprende de cada intento.
Returns:
(mejor_output, memoria_final)
"""
if tools is None:
tools = ["Read", "Bash"]
memory = AgentMemory()
best_output = ""
for iteration in range(max_iterations):
print(f"[SelfImprove] Iteración {iteration + 1}/{max_iterations}")
print(f"[SelfImprove] Reglas aprendidas hasta ahora: {len(memory.learned_rules)}")
output = await _run_with_memory(task, memory, tools)
# Verificar si cumple criterios
eval_prompt = f"""
Criterio de éxito: {success_criteria}
Output a evaluar: {output[:1000]}
¿Cumple el criterio? Responde solo: SI o NO
"""
eval_result = []
async for message in query(
prompt=eval_prompt,
options=ClaudeCodeOptions(allowed_tools=[], max_turns=3)
):
if hasattr(message, 'content') and message.content:
eval_result.append(str(message.content))
success = "SI" in "\n".join(eval_result).upper()
best_output = output
memory = await _extract_learnings(task, output, success, memory, tools)
if success:
print(f"[SelfImprove] ✓ Éxito en iteración {iteration + 1}")
break
return best_output, memory
7. Agentes Especializados con Routing
Router que Decide el Agente Más Apto
# specialist_routing.py
import asyncio
from dataclasses import dataclass
from enum import Enum
from claude_code_sdk import query, ClaudeCodeOptions
class AgentSpecialty(str, Enum):
PYTHON = "python"
TYPESCRIPT = "typescript"
SQL = "sql"
SECURITY = "security"
DEVOPS = "devops"
GENERAL = "general"
@dataclass
class Specialist:
specialty: AgentSpecialty
system_prompt: str
tools: list[str]
max_turns: int = 30
SPECIALISTS: dict[AgentSpecialty, Specialist] = {
AgentSpecialty.PYTHON: Specialist(
specialty=AgentSpecialty.PYTHON,
system_prompt="""Eres un experto en Python con 15 años de experiencia.
Conoces profundamente: asyncio, type hints, dataclasses, pydantic, fastapi,
testing con pytest, packaging, y mejores prácticas de la comunidad Python.""",
tools=["Read", "Edit", "Bash"]
),
AgentSpecialty.TYPESCRIPT: Specialist(
specialty=AgentSpecialty.TYPESCRIPT,
system_prompt="""Eres un experto en TypeScript con 10 años de experiencia.
Conoces profundamente: tipos avanzados, generics, decorators, Node.js, React,
build tools (vite, esbuild), testing con vitest, y el ecosistema npm.""",
tools=["Read", "Edit", "Bash"]
),
AgentSpecialty.SQL: Specialist(
specialty=AgentSpecialty.SQL,
system_prompt="""Eres un experto en SQL y bases de datos relacionales.
Conoces PostgreSQL, MySQL, SQLite en profundidad. Optimización de queries,
índices, transacciones, migraciones y diseño de schemas.""",
tools=["Read", "Bash"]
),
AgentSpecialty.SECURITY: Specialist(
specialty=AgentSpecialty.SECURITY,
system_prompt="""Eres un experto en seguridad de aplicaciones web.
Conoces OWASP Top 10, vulnerabilidades comunes, autenticación/autorización,
criptografía, secrets management y secure coding practices.""",
tools=["Read", "Bash"]
),
AgentSpecialty.DEVOPS: Specialist(
specialty=AgentSpecialty.DEVOPS,
system_prompt="""Eres un experto en DevOps y operaciones.
Conoces Docker, Kubernetes, CI/CD, monitoring, logging, IaC con Terraform,
AWS/GCP/Azure, y SRE practices.""",
tools=["Read", "Bash"]
),
}
async def _classify_task(task: str) -> AgentSpecialty:
"""Usa LLM para clasificar la tarea y elegir el especialista adecuado."""
specialties_list = "\n".join([
f"- {s.value}: {SPECIALISTS[s].system_prompt[:100]}..."
for s in AgentSpecialty if s != AgentSpecialty.GENERAL
])
classifier_prompt = f"""
Clasifica esta tarea para asignarla al especialista más adecuado:
Tarea: {task}
Especialistas disponibles:
{specialties_list}
- general: Para tareas que no encajan en ninguna especialidad
Responde SOLO con el nombre de la especialidad (una palabra):
python|typescript|sql|security|devops|general
"""
results = []
async for message in query(
prompt=classifier_prompt,
options=ClaudeCodeOptions(allowed_tools=[], max_turns=3)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
raw = "\n".join(results).strip().lower()
for specialty in AgentSpecialty:
if specialty.value in raw:
return specialty
return AgentSpecialty.GENERAL
async def run_specialist(task: str, specialty: AgentSpecialty) -> str:
"""Ejecuta el agente especialista para la tarea."""
specialist = SPECIALISTS.get(specialty)
if not specialist:
# Fallback a agente general
results = []
async for message in query(
prompt=task,
options=ClaudeCodeOptions(allowed_tools=["Read", "Bash"], max_turns=30)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
full_prompt = f"{specialist.system_prompt}\n\nTarea: {task}"
results = []
async for message in query(
prompt=full_prompt,
options=ClaudeCodeOptions(
allowed_tools=specialist.tools,
max_turns=specialist.max_turns
)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
async def routed_specialist(task: str) -> tuple[str, AgentSpecialty]:
"""Router + especialista: clasifica y ejecuta con el experto correcto."""
print(f"[Router] Clasificando tarea: {task[:80]}...")
specialty = await _classify_task(task)
print(f"[Router] Especialista seleccionado: {specialty.value}")
result = await run_specialist(task, specialty)
return result, specialty
8. Comunicación Indirecta via Artefactos
Agentes que se Comunican por Archivos
# artifact_communication.py
import asyncio
import os
import json
import time
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
async def producer_agent(
task: str,
output_file: str,
tools: list[str] | None = None
) -> None:
"""
Agente productor: escribe resultados a un archivo para que
el consumidor los procese.
"""
if tools is None:
tools = ["Read", "Bash", "Write"]
prompt = f"""
{task}
Cuando termines, guarda tu resultado en el archivo: {output_file}
El formato debe ser JSON con esta estructura:
{{
"status": "completed",
"timestamp": <unix_timestamp>,
"result": <tu resultado>,
"metadata": {{}}
}}
"""
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
):
pass # El agente escribe directamente al archivo
async def consumer_agent(
input_file: str,
processing_task: str,
output_file: str,
poll_interval: float = 2.0,
timeout: float = 300.0
) -> bool:
"""
Agente consumidor: espera que el archivo aparezca y lo procesa.
Returns:
True si procesó exitosamente, False si timeout
"""
print(f"[Consumer] Esperando archivo: {input_file}")
deadline = time.time() + timeout
# Polling hasta que el archivo esté disponible
while time.time() < deadline:
if os.path.exists(input_file):
try:
with open(input_file) as f:
data = json.load(f)
if data.get("status") == "completed":
break
except (json.JSONDecodeError, KeyError):
pass
await asyncio.sleep(poll_interval)
else:
print(f"[Consumer] Timeout esperando {input_file}")
return False
print(f"[Consumer] Archivo recibido, procesando...")
prompt = f"""
{processing_task}
El input está en el archivo: {input_file}
Lee el archivo, procesa el contenido y guarda el resultado en: {output_file}
"""
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=["Read", "Bash"],
max_turns=20
)
):
pass
return True
async def artifact_pipeline(work_dir: str = "/tmp/agent_artifacts") -> None:
"""
Pipeline de agentes que se comunican solo via archivos.
No hay coordinador central.
"""
Path(work_dir).mkdir(parents=True, exist_ok=True)
analysis_file = f"{work_dir}/analysis.json"
review_file = f"{work_dir}/review.json"
report_file = f"{work_dir}/report.md"
# Los 3 agentes corren "independientemente"
# Solo se sincronizan por la existencia de los archivos
await asyncio.gather(
# Agente 1: Analiza el código
producer_agent(
task="Analiza el directorio src/ y encuentra todos los TODO comments",
output_file=analysis_file
),
# Agente 2: Espera el análisis y lo revisa
consumer_agent(
input_file=analysis_file,
processing_task="Prioriza los TODO comments por urgencia e impacto",
output_file=review_file
),
# Agente 3: Genera reporte final cuando la review esté lista
consumer_agent(
input_file=review_file,
processing_task="Genera un reporte markdown con los TODO priorizados",
output_file=report_file
),
)
print(f"[Pipeline] Reporte generado en: {report_file}")
9. Supervisión Humana Escalable
Dashboard de Aprobaciones Pendientes
# human_supervision.py
import asyncio
import uuid
import json
from dataclasses import dataclass, asdict, field
from typing import Callable, Awaitable
from enum import Enum
import redis.asyncio as redis
class ApprovalStatus(str, Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
TIMEOUT = "timeout"
@dataclass
class ApprovalRequest:
request_id: str
session_id: str
action_type: str
action_description: str
action_details: dict
risk_level: str # "low", "medium", "high", "critical"
requester: str
timeout_seconds: int = 300
status: ApprovalStatus = ApprovalStatus.PENDING
reviewer_comment: str = ""
def to_dict(self) -> dict:
d = asdict(self)
d["status"] = self.status.value
return d
class HumanApprovalGateway:
"""
Gateway para solicitar aprobación humana en acciones del agente.
Usa Redis para comunicación entre el agente y el dashboard.
"""
PENDING_QUEUE = "approvals:pending"
RESULTS_PREFIX = "approvals:result:"
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def request_approval(
self,
session_id: str,
action_type: str,
action_description: str,
action_details: dict,
risk_level: str = "medium",
timeout_seconds: int = 300
) -> ApprovalStatus:
"""
Solicita aprobación humana y espera la respuesta.
En acciones de alto riesgo, el agente se bloquea hasta que
un humano aprueba o rechaza.
"""
request_id = str(uuid.uuid4())[:8]
request = ApprovalRequest(
request_id=request_id,
session_id=session_id,
action_type=action_type,
action_description=action_description,
action_details=action_details,
risk_level=risk_level,
requester=f"agent:{session_id}",
timeout_seconds=timeout_seconds
)
# Publicar en queue
await self.redis.lpush(
self.PENDING_QUEUE,
json.dumps(request.to_dict())
)
# Notificar al dashboard (via pub/sub)
await self.redis.publish(
"approvals:new",
json.dumps({"request_id": request_id, "risk_level": risk_level})
)
print(f"[Approval] Esperando aprobación para: {action_description[:80]}")
# Esperar respuesta
deadline = asyncio.get_event_loop().time() + timeout_seconds
while asyncio.get_event_loop().time() < deadline:
result = await self.redis.get(f"{self.RESULTS_PREFIX}{request_id}")
if result:
data = json.loads(result)
status = ApprovalStatus(data["status"])
print(f"[Approval] Decisión recibida: {status.value}")
return status
await asyncio.sleep(2.0)
# Timeout: registrar y usar fallback
print(f"[Approval] Timeout esperando aprobación para {request_id}")
return ApprovalStatus.TIMEOUT
async def get_pending_approvals(self) -> list[dict]:
"""Obtiene todas las aprobaciones pendientes para el dashboard."""
items = await self.redis.lrange(self.PENDING_QUEUE, 0, -1)
return [json.loads(item) for item in items]
async def submit_decision(
self,
request_id: str,
approved: bool,
comment: str = ""
) -> None:
"""El humano toma una decisión desde el dashboard."""
status = ApprovalStatus.APPROVED if approved else ApprovalStatus.REJECTED
await self.redis.setex(
f"{self.RESULTS_PREFIX}{request_id}",
3600,
json.dumps({"status": status.value, "comment": comment})
)
# ============================================================
# Hook para el SDK que solicita aprobación antes de acciones críticas
# ============================================================
def create_approval_hook(
gateway: HumanApprovalGateway,
session_id: str,
high_risk_tools: list[str] | None = None
) -> Callable:
"""
Crea un hook de PreToolUse que solicita aprobación para
herramientas de alto riesgo.
"""
if high_risk_tools is None:
high_risk_tools = ["Bash"] # Bash puede ejecutar cualquier comando
async def pre_tool_hook(tool_name: str, tool_input: dict) -> dict | None:
"""Intercepta llamadas a herramientas de alto riesgo."""
if tool_name not in high_risk_tools:
return None # Permitir sin aprobación
# Determinar nivel de riesgo basado en el comando
command = tool_input.get("command", "")
if any(dangerous in command for dangerous in ["rm -rf", "DROP TABLE", "DELETE FROM"]):
risk_level = "critical"
timeout = 600 # 10 minutos para decisiones críticas
else:
risk_level = "medium"
timeout = 300 # 5 minutos normal
status = await gateway.request_approval(
session_id=session_id,
action_type=f"tool:{tool_name}",
action_description=f"Ejecutar {tool_name}: {command[:100]}",
action_details=tool_input,
risk_level=risk_level,
timeout_seconds=timeout
)
if status == ApprovalStatus.APPROVED:
return None # Proceder con la herramienta
if status == ApprovalStatus.REJECTED:
return {
"error": "Acción rechazada por revisión humana",
"action": "skip_tool"
}
# Timeout: fallback conservador (no ejecutar)
return {
"error": "Timeout de aprobación: acción no ejecutada por seguridad",
"action": "skip_tool"
}
return pre_tool_hook
10. Sistema de Agentes con Memoria Compartida
Vector Database como Memoria Compartida
# shared_memory_agents.py
import asyncio
import json
import hashlib
from dataclasses import dataclass
from typing import Any
from claude_code_sdk import query, ClaudeCodeOptions
# Simulación de vector DB (en producción usar ChromaDB, Pinecone, Weaviate)
class SimpleVectorMemory:
"""
Memoria compartida simple basada en búsqueda por keywords.
En producción, reemplazar con ChromaDB o Pinecone para búsqueda semántica real.
"""
def __init__(self):
self._store: dict[str, dict] = {}
def store(
self,
content: str,
metadata: dict | None = None,
tags: list[str] | None = None
) -> str:
"""Almacena un fragmento de conocimiento."""
doc_id = hashlib.md5(content.encode()).hexdigest()[:8]
self._store[doc_id] = {
"id": doc_id,
"content": content,
"metadata": metadata or {},
"tags": tags or [],
}
return doc_id
def search(self, query_text: str, top_k: int = 5) -> list[dict]:
"""Búsqueda simple por keywords."""
query_words = set(query_text.lower().split())
scored = []
for doc in self._store.values():
doc_words = set(doc["content"].lower().split())
tag_words = set(" ".join(doc["tags"]).lower().split())
all_words = doc_words | tag_words
score = len(query_words & all_words) / max(len(query_words), 1)
if score > 0:
scored.append((score, doc))
scored.sort(key=lambda x: x[0], reverse=True)
return [doc for _, doc in scored[:top_k]]
async def agent_with_shared_memory(
task: str,
memory: SimpleVectorMemory,
agent_name: str = "agent",
tools: list[str] | None = None
) -> str:
"""
Agente que consulta y actualiza la memoria compartida antes y después
de ejecutar su tarea.
"""
if tools is None:
tools = ["Read", "Bash"]
# Paso 1: Recuperar conocimiento relevante de la memoria
relevant_docs = memory.search(task, top_k=3)
if relevant_docs:
memory_context = "=== Conocimiento previo relevante ===\n"
for doc in relevant_docs:
memory_context += f"\n--- {doc['id']} ---\n{doc['content'][:300]}\n"
else:
memory_context = ""
# Paso 2: Ejecutar tarea con contexto de memoria
full_prompt = f"""
{memory_context}
Tarea actual: {task}
Aplica el conocimiento previo si es relevante.
Si descubres algo nuevo o útil durante tu trabajo, inclúyelo al final del resultado
en una sección llamada "=== NUEVO CONOCIMIENTO ===".
"""
results = []
async for message in query(
prompt=full_prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
full_result = "\n".join(results)
# Paso 3: Extraer nuevo conocimiento y guardarlo en memoria compartida
if "=== NUEVO CONOCIMIENTO ===" in full_result:
new_knowledge_section = full_result.split("=== NUEVO CONOCIMIENTO ===")[1]
if new_knowledge_section.strip():
memory.store(
content=new_knowledge_section.strip(),
metadata={"agent": agent_name, "task_preview": task[:100]},
tags=task.split()[:5] # Primeras 5 palabras como tags
)
print(f"[Memory] {agent_name} guardó nuevo conocimiento")
return full_result
async def multi_agent_with_shared_memory(tasks: list[str]) -> list[str]:
"""
Múltiples agentes que comparten conocimiento acumulado.
El segundo agente se beneficia de lo que aprendió el primero.
"""
shared_memory = SimpleVectorMemory()
results = []
# Los agentes se ejecutan secuencialmente para que puedan compartir conocimiento
for i, task in enumerate(tasks):
print(f"\n[SharedMemory] Agente {i+1} procesando: {task[:60]}...")
result = await agent_with_shared_memory(
task=task,
memory=shared_memory,
agent_name=f"agent_{i+1}"
)
results.append(result)
print(f"[SharedMemory] Documentos en memoria: {len(shared_memory._store)}")
return results
# Ejemplo de uso
async def main():
tasks = [
"Analiza el archivo config.py y documenta todas las configuraciones",
"¿Cuáles configuraciones de config.py son críticas para seguridad?",
"Genera un checklist de validación para las configuraciones de seguridad",
]
results = await multi_agent_with_shared_memory(tasks)
for i, result in enumerate(results, 1):
print(f"\n=== Resultado Agente {i} ===")
print(result[:500])
if __name__ == "__main__":
asyncio.run(main())
Resumen del Capítulo
Los patrones avanzados expanden significativamente las capacidades de los sistemas multi-agente:
flowchart TB
subgraph Calidad["Para mejorar calidad"]
Reflection["Reflection\nGenerar → Criticar → Mejorar"]
CrossVerify["Verificación Cruzada\nGenerador + Verificador"]
Ensemble["Ensemble\nMúltiples → Voting"]
end
subgraph Decision["Para mejores decisiones"]
Debate["Debate\nPro vs Contra + Árbitro"]
Router["Routing\nClasificar → Especialista"]
end
subgraph Learning["Para aprender y adaptarse"]
SelfImprove["Self-Improvement\nAprender de errores"]
SharedMem["Memoria Compartida\nConocimiento acumulado"]
end
subgraph Coordination["Para coordinación flexible"]
Artifacts["Artefactos\nComunicación por archivos"]
HumanSuper["Supervisión Humana\nAprobación escalable"]
end
Cuándo usar cada patrón:
- Reflection: Código crítico, documentación técnica, análisis importantes
- Debate: Decisiones de arquitectura, trade-offs de tecnología
- Ensemble + Voting: Clasificaciones, respuestas cortas donde mayoría importa
- Verificación cruzada: Código de producción, migraciones de datos
- Self-improvement: Tareas repetitivas donde la calidad debe mejorar con el tiempo
- Routing: Sistemas con dominios claramente distintos
- Artefactos: Pipelines de procesamiento donde el orden importa pero no la velocidad
- Supervisión humana: Acciones irreversibles o de alto riesgo
- Memoria compartida: Q&A systems, sistemas que aprenden sobre una codebase
Próximo capítulo: Capítulo 20: SDK Internals y Extensiones — Cómo funciona el SDK por dentro y cómo extenderlo.
11. Patrón Critic-Actor Profundo
Más Allá del Reflection: Rúbricas Explícitas
El patrón Reflection básico tiene un critic que juzga de forma holística. El patrón Critic-Actor Profundo va más lejos: el critic usa una rúbrica con pesos que define criterios explícitos y puntuaciones. Esto hace el proceso transparente, reproducible y alineado con los objetivos del negocio.
sequenceDiagram
participant Actor
participant Critic
participant Rubrica as Rúbrica (pesos)
Actor->>Actor: Genera documentación v1
Actor->>Critic: Entrega documento
loop Hasta score >= umbral
Critic->>Rubrica: Evalúa contra criterios
Note over Critic,Rubrica: Completitud 40%<br/>Claridad 30%<br/>Ejemplos 20%<br/>Formato 10%
Rubrica->>Critic: Score por criterio
Critic->>Actor: Feedback estructurado con scores
Actor->>Actor: Mejora enfocada en criterios débiles
Actor->>Critic: Nueva versión
end
Critic->>Actor: APROBADO (score >= 0.85)
Implementación con Rúbrica Pesada
# critic_actor_deep.py
import asyncio
import json
import re
from dataclasses import dataclass, field
from claude_code_sdk import query, ClaudeCodeOptions
@dataclass
class RubricCriterion:
name: str
weight: float # 0.0 a 1.0, la suma de todos debe ser 1.0
description: str
scoring_guide: str # Descripción de qué puntúa 0.0, 0.5, 1.0
@dataclass
class CriticScore:
criterion: str
score: float # 0.0 a 1.0
feedback: str
suggestions: list[str]
@dataclass
class IterationResult:
iteration: int
content: str
total_score: float
criterion_scores: list[CriticScore]
approved: bool
DOCUMENTATION_RUBRIC = [
RubricCriterion(
name="completitud",
weight=0.40,
description="¿La documentación cubre todos los aspectos necesarios?",
scoring_guide="0.0=falta información esencial, 0.5=cubre lo básico, 1.0=cobertura completa con edge cases"
),
RubricCriterion(
name="claridad",
weight=0.30,
description="¿Es clara y fácil de entender para el público objetivo?",
scoring_guide="0.0=confusa o ambigua, 0.5=comprensible con esfuerzo, 1.0=cristalina y directa"
),
RubricCriterion(
name="ejemplos",
weight=0.20,
description="¿Incluye ejemplos concretos y funcionales?",
scoring_guide="0.0=sin ejemplos, 0.5=ejemplos básicos, 1.0=ejemplos completos con contexto"
),
RubricCriterion(
name="formato",
weight=0.10,
description="¿Tiene estructura correcta, headings, y es scannable?",
scoring_guide="0.0=muro de texto, 0.5=estructura básica, 1.0=bien estructurado y escaneable"
),
]
def build_rubric_prompt(rubric: list[RubricCriterion]) -> str:
"""Construye el texto de la rúbrica para el prompt del critic."""
lines = ["=== RÚBRICA DE EVALUACIÓN ==="]
for criterion in rubric:
lines.append(
f"\n**{criterion.name.upper()}** (peso: {criterion.weight:.0%})\n"
f"Descripción: {criterion.description}\n"
f"Guía de puntuación: {criterion.scoring_guide}"
)
return "\n".join(lines)
async def run_actor(task: str, previous_content: str, feedback: str, tools: list[str]) -> str:
"""Agente actor: genera o mejora contenido basado en feedback."""
if previous_content:
prompt = f"""
Tarea: {task}
Tu versión anterior:
{previous_content}
Feedback del crítico:
{feedback}
Mejora tu contenido atendiendo ESPECÍFICAMENTE el feedback.
Para cada punto de feedback, aplica la corrección sugerida.
Produce la versión completa mejorada.
"""
else:
prompt = f"""
Tarea: {task}
Produce una primera versión completa y detallada.
Incluye ejemplos concretos y estructura clara.
"""
results: list[str] = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=20)
):
if hasattr(message, "content") and message.content:
results.append(str(message.content))
return "\n".join(results)
async def run_critic(
task: str,
content: str,
rubric: list[RubricCriterion],
tools: list[str]
) -> tuple[float, list[CriticScore]]:
"""
Agente critic: evalúa el contenido contra la rúbrica.
Returns:
(total_score ponderado, lista de CriticScore por criterio)
"""
rubric_text = build_rubric_prompt(rubric)
criterion_names = [c.name for c in rubric]
critic_prompt = f"""
Eres un CRÍTICO RIGUROSO. Tu objetivo es mejorar la calidad, no aprobar fácilmente.
Tarea que debía cumplir el actor: {task}
Contenido a evaluar:
{content}
{rubric_text}
Para cada criterio, evalúa y proporciona:
1. Score de 0.0 a 1.0 (con decimales)
2. Justificación específica (no genérica)
3. Sugerencias concretas para mejorar
Responde en JSON exactamente:
{{
"evaluaciones": [
{{
"criterio": "<nombre del criterio>",
"score": <float 0.0-1.0>,
"feedback": "<justificación>",
"sugerencias": ["<sugerencia 1>", "<sugerencia 2>"]
}}
]
}}
Criterios: {criterion_names}
"""
results: list[str] = []
async for message in query(
prompt=critic_prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=10)
):
if hasattr(message, "content") and message.content:
results.append(str(message.content))
raw = "\n".join(results)
scores: list[CriticScore] = []
try:
json_match = re.search(r'\{.*\}', raw, re.DOTALL)
if json_match:
data = json.loads(json_match.group())
criterion_map = {c.name: c for c in rubric}
for eval_data in data.get("evaluaciones", []):
scores.append(CriticScore(
criterion=eval_data["criterio"],
score=float(eval_data["score"]),
feedback=eval_data["feedback"],
suggestions=eval_data.get("sugerencias", [])
))
except (json.JSONDecodeError, KeyError):
# Fallback si el JSON no se puede parsear
for criterion in rubric:
scores.append(CriticScore(
criterion=criterion.name,
score=0.5,
feedback="No se pudo obtener evaluación estructurada",
suggestions=[]
))
# Calcular score ponderado
total_score = 0.0
weight_map = {c.name: c.weight for c in rubric}
for cs in scores:
total_score += cs.score * weight_map.get(cs.criterion, 0.0)
return total_score, scores
def build_feedback_text(scores: list[CriticScore], threshold: float = 0.7) -> str:
"""Construye el texto de feedback para el actor."""
weak_criteria = [cs for cs in scores if cs.score < threshold]
lines = ["=== FEEDBACK DEL CRÍTICO ==="]
for cs in weak_criteria:
lines.append(f"\n**{cs.criterion.upper()}** (score: {cs.score:.2f})")
lines.append(f"Problema: {cs.feedback}")
for suggestion in cs.suggestions:
lines.append(f" -> {suggestion}")
if not weak_criteria:
lines.append("Todos los criterios superan el umbral mínimo.")
return "\n".join(lines)
async def critic_actor_deep(
task: str,
rubric: list[RubricCriterion] | None = None,
approval_threshold: float = 0.85,
max_iterations: int = 4,
actor_tools: list[str] | None = None,
critic_tools: list[str] | None = None
) -> tuple[str, list[IterationResult]]:
"""
Patrón Critic-Actor Profundo con rúbrica pesada.
Returns:
(contenido_final, historial_de_iteraciones)
"""
if rubric is None:
rubric = DOCUMENTATION_RUBRIC
if actor_tools is None:
actor_tools = ["Read", "Bash"]
if critic_tools is None:
critic_tools = ["Read"]
history: list[IterationResult] = []
current_content = ""
feedback_text = ""
for iteration in range(1, max_iterations + 1):
print(f"\n[CriticActor] === Iteración {iteration}/{max_iterations} ===")
# Actor genera/mejora
current_content = await run_actor(task, current_content, feedback_text, actor_tools)
print(f"[CriticActor] Actor produjo {len(current_content)} chars")
# Critic evalúa con rúbrica
total_score, criterion_scores = await run_critic(task, current_content, rubric, critic_tools)
print(f"[CriticActor] Score total: {total_score:.3f} (umbral: {approval_threshold})")
for cs in criterion_scores:
status = "OK" if cs.score >= 0.7 else "MEJORA"
print(f" [{status}] {cs.criterion}: {cs.score:.2f}")
approved = total_score >= approval_threshold
history.append(IterationResult(
iteration=iteration,
content=current_content,
total_score=total_score,
criterion_scores=criterion_scores,
approved=approved
))
if approved:
print(f"[CriticActor] Aprobado con score {total_score:.3f}")
break
# Preparar feedback enfocado en criterios débiles
feedback_text = build_feedback_text(criterion_scores)
return current_content, history
# Ejemplo: documentación técnica de una API
async def main():
task = """
Genera documentación técnica para la función:
async def calculate_compound_interest(principal: float, rate: float, periods: int) -> float
La documentación debe ser útil para desarrolladores Python que integren esta función.
"""
content, history = await critic_actor_deep(
task=task,
approval_threshold=0.85,
max_iterations=3
)
print("\n=== RESULTADO FINAL ===")
print(f"Iteraciones: {len(history)}")
print(f"Score final: {history[-1].total_score:.3f}")
print(content[:500])
if __name__ == "__main__":
asyncio.run(main())
12. Patrón Constitutional AI para Agentes
Agente que se Auto-Audita contra Principios
El Constitutional AI (CAI) es una técnica donde el agente evalúa su propio output contra un conjunto de principios predefinidos antes de entregarlo. Esto reduce sesgos y garantiza alineación con valores organizacionales.
# constitutional_agent.py
import asyncio
from dataclasses import dataclass
from claude_code_sdk import query, ClaudeCodeOptions
@dataclass
class ConstitutionalPrinciple:
name: str
description: str
check_prompt: str
severity: str # "critical", "major", "minor"
CODE_CONSTITUTION = [
ConstitutionalPrinciple(
name="no_secrets_hardcodeados",
description="El código no debe contener contraseñas, API keys o tokens hardcodeados.",
check_prompt="¿El código contiene credenciales, contraseñas, API keys o tokens hardcodeados (no en variables de entorno)? Responde SI o NO con justificación.",
severity="critical"
),
ConstitutionalPrinciple(
name="no_sql_injection",
description="Las queries SQL deben usar parámetros, nunca interpolación directa de strings.",
check_prompt="¿El código ejecuta queries SQL con f-strings o concatenación directa de variables? Responde SI o NO con justificación.",
severity="critical"
),
ConstitutionalPrinciple(
name="manejo_de_errores",
description="Las operaciones que pueden fallar deben tener manejo de errores explícito.",
check_prompt="¿Hay operaciones de red, archivos o bases de datos sin manejo de excepciones? Responde SI o NO con justificación.",
severity="major"
),
ConstitutionalPrinciple(
name="no_codigo_comentado",
description="El código no debe contener bloques de código comentado.",
check_prompt="¿Hay código Python/JavaScript comentado (no docstrings, sino código que fue desactivado)? Responde SI o NO.",
severity="minor"
),
]
@dataclass
class AuditResult:
principle: str
violation: bool
severity: str
explanation: str
async def audit_against_principle(
code: str,
principle: ConstitutionalPrinciple,
tools: list[str]
) -> AuditResult:
"""Audita el código contra un principio constitucional."""
audit_prompt = f"""
Principio a verificar: {principle.name}
Descripción: {principle.description}
Código a auditar:
{code}
{principle.check_prompt}
Justifica tu respuesta con referencias específicas al código.
"""
texts: list[str] = []
async for message in query(
prompt=audit_prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=5)
):
if hasattr(message, "content") and message.content:
texts.append(str(message.content))
response = "\n".join(texts).upper()
violation = "SI" in response and "NO" not in response[:20]
return AuditResult(
principle=principle.name,
violation=violation,
severity=principle.severity,
explanation="\n".join(texts)[:300]
)
async def constitutional_agent(
task: str,
constitution: list[ConstitutionalPrinciple] | None = None,
max_revision_cycles: int = 3,
tools: list[str] | None = None
) -> tuple[str, list[AuditResult]]:
"""
Agente con auto-auditoría constitucional.
Flujo: Generar → Auditar → Revisar si hay violaciones críticas → Retornar
Returns:
(código_final, resultados_de_auditoría)
"""
if constitution is None:
constitution = CODE_CONSTITUTION
if tools is None:
tools = ["Bash"]
audit_tools = [] # El auditor no necesita herramientas externas
# Paso 1: Generar el código
print("[Constitutional] Generando código inicial...")
gen_results: list[str] = []
async for message in query(
prompt=task,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=20)
):
if hasattr(message, "content") and message.content:
gen_results.append(str(message.content))
code = "\n".join(gen_results)
for cycle in range(max_revision_cycles):
print(f"[Constitutional] Ciclo de auditoría {cycle + 1}/{max_revision_cycles}")
# Paso 2: Auditar en paralelo contra todos los principios
audit_tasks = [
audit_against_principle(code, principle, audit_tools)
for principle in constitution
]
audit_results = await asyncio.gather(*audit_tasks)
violations = [r for r in audit_results if r.violation]
critical_violations = [r for r in violations if r.severity == "critical"]
for result in audit_results:
status = "VIOLACION" if result.violation else "OK"
print(f" [{status}] {result.principle} ({result.severity})")
# Paso 3: Si no hay violaciones críticas, aprobar
if not critical_violations:
print(f"[Constitutional] Aprobado. Violaciones menores: {len(violations) - len(critical_violations)}")
return code, list(audit_results)
# Paso 4: Corregir violaciones críticas
violations_text = "\n".join([
f"- {v.principle}: {v.explanation[:200]}"
for v in critical_violations
])
revision_prompt = f"""
Tu código generado tiene violaciones de seguridad críticas:
{violations_text}
Código original:
{code}
Corrige TODAS las violaciones críticas listadas. Produce el código corregido completo.
"""
rev_results: list[str] = []
async for message in query(
prompt=revision_prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=15)
):
if hasattr(message, "content") and message.content:
rev_results.append(str(message.content))
code = "\n".join(rev_results)
print("[Constitutional] Máximo de ciclos alcanzado")
return code, list(audit_results)
13. Patrón Speculative Execution
Ejecutar Múltiples Estrategias en Paralelo, Usar la Primera Exitosa
Cuando hay incertidumbre sobre qué estrategia resolverá el problema, ejecuta varias en paralelo y usa la que termine exitosamente primero. Las demás se cancelan.
flowchart LR
Task["Tarea de Refactoring"] --> |fork| S1["Estrategia 1:\nExtract Method"]
Task --> |fork| S2["Estrategia 2:\nDecompose Conditional"]
Task --> |fork| S3["Estrategia 3:\nReplace Temp with Query"]
S1 --> |termina primero| Race{{"Race\n(primero en completar)"}
S2 --> |todavía corriendo| Race
S3 --> |todavía corriendo| Race
Race --> |cancela| S2
Race --> |cancela| S3
Race --> Result["Resultado Final\n(Estrategia 1)"]
Implementación con asyncio
# speculative_execution.py
import asyncio
from dataclasses import dataclass
from typing import Any, Callable, Coroutine
from claude_code_sdk import query, ClaudeCodeOptions
@dataclass
class StrategyResult:
strategy_name: str
output: str
success: bool
error: str | None = None
async def run_strategy(
strategy_name: str,
task: str,
strategy_hint: str,
tools: list[str],
max_turns: int = 25,
) -> StrategyResult:
"""Ejecuta una estrategia específica para resolver la tarea."""
prompt = f"""
Estrategia a usar: {strategy_hint}
Tarea: {task}
Aplica ESPECÍFICAMENTE la estrategia indicada, no otras.
Al terminar, verifica que la solución es correcta ejecutando tests si están disponibles.
"""
texts: list[str] = []
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=max_turns)
):
if hasattr(message, "content") and message.content:
texts.append(str(message.content))
output = "\n".join(texts)
success = "error" not in output.lower()[:100]
return StrategyResult(strategy_name=strategy_name, output=output, success=success)
except Exception as e:
return StrategyResult(
strategy_name=strategy_name,
output="",
success=False,
error=str(e)
)
async def speculative_execution(
task: str,
strategies: list[dict[str, str]],
tools: list[str] | None = None,
timeout_per_strategy: float = 120.0,
) -> StrategyResult:
"""
Ejecuta múltiples estrategias en paralelo.
Retorna la primera que termine exitosamente.
Cancela las demás.
Args:
task: Descripción de la tarea
strategies: Lista de dicts con 'name' y 'hint' para cada estrategia
tools: Herramientas disponibles
timeout_per_strategy: Timeout máximo en segundos
Returns:
El resultado de la estrategia ganadora
"""
if tools is None:
tools = ["Read", "Edit", "Bash"]
print(f"[Speculative] Lanzando {len(strategies)} estrategias en paralelo...")
async def strategy_with_timeout(strategy: dict) -> StrategyResult:
try:
return await asyncio.wait_for(
run_strategy(strategy["name"], task, strategy["hint"], tools),
timeout=timeout_per_strategy
)
except asyncio.TimeoutError:
return StrategyResult(
strategy_name=strategy["name"],
output="",
success=False,
error=f"Timeout después de {timeout_per_strategy}s"
)
# Crear todas las tareas
coroutines = [strategy_with_timeout(s) for s in strategies]
tasks = [asyncio.create_task(coro) for coro in coroutines]
winner: StrategyResult | None = None
try:
# Esperar la primera que termine exitosamente
for coro in asyncio.as_completed([asyncio.shield(t) for t in tasks]):
result = await coro
print(f"[Speculative] Estrategia '{result.strategy_name}' terminó: success={result.success}")
if result.success and winner is None:
winner = result
break # Tenemos un ganador
finally:
# Cancelar todas las tareas pendientes
for t in tasks:
if not t.done():
t.cancel()
print(f"[Speculative] Cancelando tarea pendiente...")
if winner is None:
# Ninguna fue exitosa, retornar la primera
completed = [t for t in tasks if t.done() and not t.cancelled()]
if completed:
winner = completed[0].result()
else:
winner = StrategyResult(
strategy_name="none",
output="Ninguna estrategia completó exitosamente",
success=False
)
print(f"[Speculative] Ganadora: '{winner.strategy_name}'")
return winner
# Ejemplo: 3 estrategias de refactoring en paralelo
async def main():
task = """
Refactoriza la función process_data() en src/processor.py.
La función tiene 150 líneas y hace demasiadas cosas.
El código refactorizado debe pasar todos los tests existentes.
"""
strategies = [
{
"name": "extract_method",
"hint": "Extrae submétodos con responsabilidad única. Cada método debe hacer exactamente una cosa."
},
{
"name": "decompose_conditional",
"hint": "Simplifica los condicionales complejos usando guard clauses y early returns."
},
{
"name": "pipeline_pattern",
"hint": "Convierte la función en un pipeline de transformaciones pequeñas encadenadas."
}
]
winner = await speculative_execution(task, strategies, timeout_per_strategy=90.0)
print(f"\nEstrategia ganadora: {winner.strategy_name}")
print(winner.output[:500])
if __name__ == "__main__":
asyncio.run(main())
14. Sistemas de Agentes Reactivos
Agentes que Reaccionan a Eventos del Filesystem
Los agentes reactivos no esperan instrucciones explícitas: monitorean el ambiente y actúan cuando algo cambia. Son ideales para CI/CD, pipelines de datos y monitoreo.
# reactive_agents.py
import asyncio
import os
import time
from pathlib import Path
from dataclasses import dataclass, field
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
from claude_code_sdk import query, ClaudeCodeOptions
@dataclass
class FileEvent:
path: str
event_type: str # "created", "modified"
timestamp: float = field(default_factory=time.time)
class AgentFileWatcher(FileSystemEventHandler):
"""
File watcher que pone eventos en una cola async
para que el agente los procese.
"""
def __init__(self, event_queue: asyncio.Queue, patterns: list[str] | None = None):
self.event_queue = event_queue
self.patterns = patterns or ["*.py", "*.ts", "*.json"]
self._loop = asyncio.get_event_loop()
def _matches_pattern(self, path: str) -> bool:
from fnmatch import fnmatch
return any(fnmatch(os.path.basename(path), p) for p in self.patterns)
def on_created(self, event):
if not event.is_directory and self._matches_pattern(event.src_path):
file_event = FileEvent(path=event.src_path, event_type="created")
self._loop.call_soon_threadsafe(
self.event_queue.put_nowait, file_event
)
def on_modified(self, event):
if not event.is_directory and self._matches_pattern(event.src_path):
file_event = FileEvent(path=event.src_path, event_type="modified")
self._loop.call_soon_threadsafe(
self.event_queue.put_nowait, file_event
)
async def file_processor_agent(event: FileEvent, cwd: str) -> str:
"""Agente que procesa un archivo cuando es creado o modificado."""
if event.event_type == "created":
task = f"Nuevo archivo detectado: {event.path}. Revisa si tiene problemas obvios de calidad o seguridad."
else:
task = f"Archivo modificado: {event.path}. Verifica que los cambios no introducen regresiones."
texts: list[str] = []
async for message in query(
prompt=task,
options=ClaudeCodeOptions(
cwd=cwd,
allowed_tools=["Read"],
model="claude-haiku-4-5",
max_turns=5,
)
):
if hasattr(message, "content") and message.content:
texts.append(str(message.content))
return "\n".join(texts)
async def reactive_file_agent(
watch_dir: str,
patterns: list[str] | None = None,
duration_seconds: float = 60.0
) -> None:
"""
Agente reactivo que monitorea un directorio y procesa archivos.
Args:
watch_dir: Directorio a monitorear
patterns: Patrones de archivos a monitorear (glob)
duration_seconds: Tiempo de monitoreo
"""
event_queue: asyncio.Queue[FileEvent] = asyncio.Queue()
handler = AgentFileWatcher(event_queue, patterns)
observer = Observer()
observer.schedule(handler, watch_dir, recursive=True)
observer.start()
print(f"[Reactive] Monitoreando {watch_dir} por {duration_seconds}s...")
try:
deadline = time.time() + duration_seconds
while time.time() < deadline:
try:
event = await asyncio.wait_for(event_queue.get(), timeout=1.0)
print(f"[Reactive] Evento: {event.event_type} en {event.path}")
result = await file_processor_agent(event, watch_dir)
print(f"[Reactive] Análisis: {result[:200]}")
except asyncio.TimeoutError:
continue # No hay eventos, seguir esperando
finally:
observer.stop()
observer.join()
print("[Reactive] Monitoreo terminado")
# ─── Watchdog Pattern: Agente que supervisa a otro agente ──────
class AgentWatchdog:
"""
Supervisore de agentes: si el agente supervisado no produce
output en X segundos, lo considera colgado y lo reinicia.
"""
def __init__(self, timeout_seconds: float = 60.0, max_restarts: int = 3):
self.timeout_seconds = timeout_seconds
self.max_restarts = max_restarts
self._last_heartbeat = time.time()
self._restart_count = 0
def heartbeat(self) -> None:
"""El agente supervisado debe llamar a esto periódicamente."""
self._last_heartbeat = time.time()
def is_alive(self) -> bool:
"""True si el agente respondió recientemente."""
return time.time() - self._last_heartbeat < self.timeout_seconds
async def watch(self, agent_coro, task_description: str) -> str:
"""
Supervisa una corutina de agente con timeout y reintentos.
"""
for attempt in range(self.max_restarts + 1):
try:
result = await asyncio.wait_for(
agent_coro(task_description),
timeout=self.timeout_seconds
)
return result
except asyncio.TimeoutError:
self._restart_count += 1
print(f"[Watchdog] Timeout en intento {attempt + 1}/{self.max_restarts + 1}")
if attempt < self.max_restarts:
print(f"[Watchdog] Reiniciando agente...")
await asyncio.sleep(2.0)
continue
raise RuntimeError(f"Agente no respondió después de {self.max_restarts + 1} intentos")
# ─── Agente que monitorea logs en tiempo real ─────────────────
async def log_monitor_agent(log_file: str, check_interval: float = 5.0) -> None:
"""
Lee un archivo de log en tiempo real y actúa cuando detecta errores.
Similar a `tail -f` pero con inteligencia del agente.
"""
last_size = 0
print(f"[LogMonitor] Monitoreando {log_file}...")
while True:
await asyncio.sleep(check_interval)
try:
current_size = os.path.getsize(log_file)
except FileNotFoundError:
continue
if current_size <= last_size:
continue
# Hay nuevas líneas — leerlas
with open(log_file) as f:
f.seek(last_size)
new_content = f.read()
last_size = current_size
# Verificar si hay errores en las nuevas líneas
if "ERROR" in new_content or "CRITICAL" in new_content:
print(f"[LogMonitor] Error detectado en logs, analizando...")
texts: list[str] = []
async for message in query(
prompt=f"""
Analiza estas líneas de log que contienen errores:
{new_content[-2000:]}
Determina:
1. Tipo de error
2. Causa probable
3. Acción recomendada (¿alerta? ¿restart? ¿ignorar?)
""",
options=ClaudeCodeOptions(
allowed_tools=[],
model="claude-haiku-4-5",
max_turns=3
)
):
if hasattr(message, "content") and message.content:
texts.append(str(message.content))
analysis = "\n".join(texts)
print(f"[LogMonitor] Análisis: {analysis[:300]}")
15. Federación de Agentes
Múltiples Instancias del SDK en Diferentes Máquinas
La federación permite distribuir trabajo de agentes entre múltiples máquinas, coordinadas por un orquestador central. Es útil cuando la carga de trabajo supera lo que una instancia puede manejar.
# federation.py
import asyncio
import httpx
import uuid
from dataclasses import dataclass
from typing import Any
from claude_code_sdk import query, ClaudeCodeOptions
@dataclass
class AgentNode:
"""Representa un nodo de agente en el cluster."""
node_id: str
host: str
port: int
capabilities: list[str]
current_load: int = 0
@property
def base_url(self) -> str:
return f"http://{self.host}:{self.port}"
class AgentCluster:
"""
Cluster de agentes distribuidos.
Cada nodo expone una API REST para recibir tareas.
"""
def __init__(self, nodes: list[AgentNode]):
self.nodes = nodes
def select_node(self, required_capabilities: list[str] | None = None) -> AgentNode | None:
"""Selecciona el nodo disponible más adecuado."""
candidates = self.nodes
if required_capabilities:
candidates = [
n for n in candidates
if all(cap in n.capabilities for cap in required_capabilities)
]
if not candidates:
return None
# Elegir el nodo con menor carga
return min(candidates, key=lambda n: n.current_load)
async def dispatch_task(
self,
task: str,
required_capabilities: list[str] | None = None,
timeout: float = 120.0
) -> dict[str, Any]:
"""Despacha una tarea al nodo más adecuado."""
node = self.select_node(required_capabilities)
if not node:
raise RuntimeError(f"No hay nodos disponibles para: {required_capabilities}")
node.current_load += 1
task_id = str(uuid.uuid4())[:8]
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
f"{node.base_url}/tasks",
json={"task_id": task_id, "prompt": task}
)
response.raise_for_status()
return response.json()
finally:
node.current_load = max(0, node.current_load - 1)
async def broadcast_task(
self,
task: str,
aggregation: str = "first_success"
) -> list[dict[str, Any]]:
"""
Envía la misma tarea a todos los nodos.
Útil para ensemble voting distribuido.
"""
tasks = [self.dispatch_task(task) for _ in self.nodes]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if isinstance(r, dict)]
# ─── Servidor de agente (FastAPI) ─────────────────────────────
# Este sería el código que corre en cada nodo del cluster
AGENT_SERVER_CODE = '''
# agent_server.py (corre en cada nodo)
from fastapi import FastAPI
from pydantic import BaseModel
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
app = FastAPI()
class TaskRequest(BaseModel):
task_id: str
prompt: str
tools: list[str] = ["Read", "Bash"]
max_turns: int = 20
@app.post("/tasks")
async def process_task(request: TaskRequest):
texts = []
async for message in query(
prompt=request.prompt,
options=ClaudeCodeOptions(
allowed_tools=request.tools,
max_turns=request.max_turns
)
):
if hasattr(message, "content") and message.content:
texts.append(str(message.content))
return {
"task_id": request.task_id,
"output": "\\n".join(texts),
"success": True
}
@app.get("/health")
async def health():
return {"status": "ok", "node_id": "node-1"}
'''
# Ejemplo de uso del cluster
async def main():
# Definir el cluster (en prod, estos valores vendrían de service discovery)
nodes = [
AgentNode("node-1", "agent-1.internal", 8001, ["python", "review"]),
AgentNode("node-2", "agent-2.internal", 8002, ["python", "typescript", "review"]),
AgentNode("node-3", "agent-3.internal", 8003, ["security", "review"]),
]
cluster = AgentCluster(nodes)
# Despachar tarea a nodo con capacidad de TypeScript
result = await cluster.dispatch_task(
task="Revisa el código TypeScript en src/ y reporta problemas de tipos.",
required_capabilities=["typescript"]
)
print(f"Resultado del nodo: {result}")
16. Agentes con Memoria a Largo Plazo
Memoria Episódica y Semántica con Vector Database
La memoria a largo plazo permite que un agente recuerde decisiones pasadas, aprendizajes del proyecto y contexto acumulado. Esto es fundamental para agentes que trabajan en proyectos a lo largo del tiempo.
flowchart TB
Query["Nueva tarea del agente"] --> Retrieval["Retrieval\n(búsqueda semántica)"]
subgraph VectorDB["Vector Database (ChromaDB / Qdrant)"]
EpisodicMem["Memoria Episódica\n(conversaciones pasadas)"]
SemanticMem["Memoria Semántica\n(conocimiento del dominio)"]
ProceduralMem["Memoria Procedimental\n(cómo hacer las cosas)"]
end
Retrieval --> EpisodicMem
Retrieval --> SemanticMem
Retrieval --> ProceduralMem
EpisodicMem --> |top-k relevantes| Context["Contexto enriquecido"]
SemanticMem --> |top-k relevantes| Context
ProceduralMem --> |top-k relevantes| Context
Context --> Agent["Agente con contexto\nde memoria"]
Agent --> Result["Resultado"]
Result --> |extrae nuevo conocimiento| Storage["Almacenamiento\nen Vector DB"]
Storage --> VectorDB
Implementación con ChromaDB
# long_term_memory.py
import asyncio
import json
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from claude_code_sdk import query, ClaudeCodeOptions
# Para producción: pip install chromadb
# Aquí usamos una implementación simplificada para el ejemplo
try:
import chromadb
CHROMADB_AVAILABLE = True
except ImportError:
CHROMADB_AVAILABLE = False
@dataclass
class MemoryEntry:
content: str
memory_type: str # "episodic", "semantic", "procedural"
metadata: dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
class AgentMemoryStore:
"""
Almacén de memoria con búsqueda semántica.
Usa ChromaDB si está disponible, fallback a búsqueda por keywords.
"""
def __init__(self, project_id: str, persist_dir: str = "/tmp/agent_memory"):
self.project_id = project_id
self.persist_dir = persist_dir
self._entries: list[MemoryEntry] = []
if CHROMADB_AVAILABLE:
self._client = chromadb.PersistentClient(path=persist_dir)
self._collection = self._client.get_or_create_collection(
name=f"project_{project_id}",
metadata={"hnsw:space": "cosine"}
)
else:
print("[Memory] ChromaDB no disponible, usando búsqueda por keywords")
def store_episodic(
self,
conversation_summary: str,
task: str,
outcome: str,
tags: list[str] | None = None
) -> str:
"""Almacena un recuerdo episódico (resumen de conversación)."""
content = f"TAREA: {task}\nRESULTADO: {outcome}\nRESUMEN: {conversation_summary}"
entry = MemoryEntry(
content=content,
memory_type="episodic",
metadata={"task": task[:100], "tags": tags or []}
)
return self._store_entry(entry)
def store_semantic(
self,
knowledge: str,
category: str,
tags: list[str] | None = None
) -> str:
"""Almacena conocimiento semántico (hechos del dominio)."""
entry = MemoryEntry(
content=knowledge,
memory_type="semantic",
metadata={"category": category, "tags": tags or []}
)
return self._store_entry(entry)
def store_procedural(
self,
procedure: str,
use_case: str,
tags: list[str] | None = None
) -> str:
"""Almacena conocimiento procedimental (cómo hacer algo)."""
content = f"CASO DE USO: {use_case}\nPROCEDIMIENTO: {procedure}"
entry = MemoryEntry(
content=content,
memory_type="procedural",
metadata={"use_case": use_case[:100], "tags": tags or []}
)
return self._store_entry(entry)
def _store_entry(self, entry: MemoryEntry) -> str:
"""Almacena una entrada en la memoria."""
entry_id = f"{entry.memory_type}_{len(self._entries)}"
self._entries.append(entry)
if CHROMADB_AVAILABLE:
self._collection.add(
documents=[entry.content],
metadatas=[{**entry.metadata, "type": entry.memory_type, "timestamp": entry.timestamp}],
ids=[entry_id]
)
return entry_id
def retrieve(
self,
query_text: str,
top_k: int = 5,
memory_type: str | None = None
) -> list[MemoryEntry]:
"""
Recupera las memorias más relevantes para la query.
Usa búsqueda semántica con ChromaDB o keyword search como fallback.
"""
if CHROMADB_AVAILABLE:
where = {"type": memory_type} if memory_type else None
results = self._collection.query(
query_texts=[query_text],
n_results=top_k,
where=where
)
entries = []
for i, doc in enumerate(results["documents"][0]):
meta = results["metadatas"][0][i]
entries.append(MemoryEntry(
content=doc,
memory_type=meta.get("type", "unknown"),
metadata=meta
))
return entries
else:
# Búsqueda por keywords como fallback
query_words = set(query_text.lower().split())
scored = []
for entry in self._entries:
if memory_type and entry.memory_type != memory_type:
continue
doc_words = set(entry.content.lower().split())
score = len(query_words & doc_words) / max(len(query_words), 1)
if score > 0:
scored.append((score, entry))
scored.sort(key=lambda x: x[0], reverse=True)
return [entry for _, entry in scored[:top_k]]
def build_memory_context(self, query_text: str, max_chars: int = 2000) -> str:
"""Construye un bloque de contexto de memoria para incluir en el prompt."""
relevant = self.retrieve(query_text, top_k=5)
if not relevant:
return ""
lines = ["=== MEMORIA RELEVANTE DEL PROYECTO ==="]
total_chars = 0
for entry in relevant:
content_preview = entry.content[:400]
if total_chars + len(content_preview) > max_chars:
break
lines.append(f"\n[{entry.memory_type.upper()}] {entry.metadata.get('timestamp', '')[:10]}")
lines.append(content_preview)
total_chars += len(content_preview)
return "\n".join(lines)
async def agent_with_long_term_memory(
task: str,
memory: AgentMemoryStore,
tools: list[str] | None = None
) -> str:
"""
Agente que usa memoria a largo plazo.
Antes de cada query, recupera memorias relevantes.
Después de cada query, extrae y almacena nuevo conocimiento.
"""
if tools is None:
tools = ["Read", "Bash"]
# Recuperar memoria relevante
memory_context = memory.build_memory_context(task)
prompt = f"""
{memory_context}
Tarea actual: {task}
Notas:
- Aplica el conocimiento previo si es relevante
- Si tomas decisiones de arquitectura o diseño, explica el razonamiento
- Al final de tu respuesta, incluye una sección "=== NUEVO CONOCIMIENTO ===" con
cualquier aprendizaje nuevo que debería recordarse para el futuro
"""
texts: list[str] = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
):
if hasattr(message, "content") and message.content:
texts.append(str(message.content))
full_output = "\n".join(texts)
# Extraer y almacenar nuevo conocimiento
if "=== NUEVO CONOCIMIENTO ===" in full_output:
knowledge_section = full_output.split("=== NUEVO CONOCIMIENTO ===")[1].strip()
if knowledge_section:
# Determinar tipo de conocimiento
if any(word in task.lower() for word in ["cómo", "pasos", "procedimiento", "proceso"]):
memory.store_procedural(
procedure=knowledge_section[:500],
use_case=task[:100],
tags=task.split()[:5]
)
else:
memory.store_episodic(
conversation_summary=knowledge_section[:500],
task=task[:100],
outcome="completado",
tags=task.split()[:5]
)
print("[Memory] Nuevo conocimiento almacenado")
return full_output
# Ejemplo completo: agente de desarrollo con memoria persistente
async def development_agent_session():
"""
Simulación de sesión de trabajo con memoria persistente.
El agente recuerda decisiones de sesiones anteriores.
"""
memory = AgentMemoryStore(project_id="mi_proyecto")
# Pre-cargar conocimiento del proyecto
memory.store_semantic(
knowledge="El proyecto usa FastAPI con SQLAlchemy. Las rutas están en src/api/routes/. Los modelos en src/models/.",
category="arquitectura",
tags=["fastapi", "sqlalchemy", "estructura"]
)
memory.store_semantic(
knowledge="Convención: todas las funciones deben tener type hints y docstrings. Tests con pytest-asyncio.",
category="convenciones",
tags=["convenciones", "testing"]
)
memory.store_procedural(
procedure="Para agregar una nueva ruta: 1) Crear endpoint en src/api/routes/{nombre}.py, 2) Registrar en src/api/__init__.py, 3) Agregar tests en tests/test_{nombre}.py",
use_case="agregar nueva ruta a la API",
tags=["ruta", "api", "nueva funcionalidad"]
)
# Sesión de trabajo
tasks = [
"Agrega un endpoint GET /users/{user_id}/profile que retorne el perfil del usuario",
"¿Cuál es la convención de naming para los tests en este proyecto?",
"Crea el test para el endpoint que acabas de crear",
]
for task in tasks:
print(f"\n{'='*60}")
print(f"TAREA: {task}")
print('='*60)
result = await agent_with_long_term_memory(task, memory)
print(f"RESULTADO: {result[:400]}...")
if __name__ == "__main__":
asyncio.run(development_agent_session())
TypeScript: Implementación con Qdrant
// long-term-memory.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
// En producción: npm install @qdrant/js-client-rest
// Aquí simulamos la interfaz
interface MemoryEntry {
id: string;
content: string;
type: "episodic" | "semantic" | "procedural";
tags: string[];
timestamp: string;
score?: number;
}
class KeywordMemoryStore {
private entries: MemoryEntry[] = [];
private counter = 0;
store(entry: Omit<MemoryEntry, "id" | "timestamp">): string {
const id = `${entry.type}_${++this.counter}`;
this.entries.push({
...entry,
id,
timestamp: new Date().toISOString(),
});
return id;
}
retrieve(queryText: string, topK = 5, type?: string): MemoryEntry[] {
const queryWords = new Set(queryText.toLowerCase().split(/\s+/));
const scored = this.entries
.filter((e) => !type || e.type === type)
.map((entry) => {
const docWords = new Set(
(entry.content + " " + entry.tags.join(" ")).toLowerCase().split(/\s+/)
);
const intersection = [...queryWords].filter((w) => docWords.has(w));
const score = intersection.length / Math.max(queryWords.size, 1);
return { ...entry, score };
})
.filter((e) => (e.score ?? 0) > 0)
.sort((a, b) => (b.score ?? 0) - (a.score ?? 0))
.slice(0, topK);
return scored;
}
buildContext(queryText: string, maxChars = 2000): string {
const relevant = this.retrieve(queryText);
if (!relevant.length) return "";
const lines = ["=== MEMORIA RELEVANTE ==="];
let totalChars = 0;
for (const entry of relevant) {
const preview = entry.content.slice(0, 400);
if (totalChars + preview.length > maxChars) break;
lines.push(`\n[${entry.type.toUpperCase()}] ${entry.timestamp.slice(0, 10)}`);
lines.push(preview);
totalChars += preview.length;
}
return lines.join("\n");
}
}
async function agentWithLongTermMemory(
task: string,
memory: KeywordMemoryStore,
tools: string[] = ["Read", "Bash"]
): Promise<string> {
const memoryContext = memory.buildContext(task);
const prompt = `${memoryContext}\n\nTarea: ${task}\n\nSi descubres conocimiento nuevo útil, inclúyelo en "=== NUEVO CONOCIMIENTO ===".`;
const texts: string[] = [];
for await (const message of query(prompt, {
allowedTools: tools,
maxTurns: 20,
} as ClaudeCodeOptions)) {
if ("content" in message && message.content) {
texts.push(String(message.content));
}
}
const output = texts.join("\n");
if (output.includes("=== NUEVO CONOCIMIENTO ===")) {
const knowledge = output.split("=== NUEVO CONOCIMIENTO ===")[1].trim();
if (knowledge) {
memory.store({
content: knowledge.slice(0, 500),
type: "episodic",
tags: task.split(/\s+/).slice(0, 5),
});
console.log("[Memory] Nuevo conocimiento almacenado");
}
}
return output;
}
export { KeywordMemoryStore, agentWithLongTermMemory };
Resumen Expandido del Capítulo
flowchart TB
subgraph Calidad["Para mejorar calidad"]
Reflection["Reflection\nGenerar → Criticar → Mejorar"]
CriticActor["Critic-Actor Profundo\nRúbrica con pesos"]
CrossVerify["Verificación Cruzada\nGenerador + Verificador"]
Constitutional["Constitutional AI\nAuto-auditoría de principios"]
Ensemble["Ensemble\nMúltiples → Voting"]
end
subgraph Decision["Para mejores decisiones"]
Debate["Debate\nPro vs Contra + Árbitro"]
Router["Routing\nClasificar → Especialista"]
Speculative["Speculative Execution\nRace de estrategias"]
end
subgraph Learning["Para aprender y adaptarse"]
SelfImprove["Self-Improvement\nAprender de errores"]
SharedMem["Memoria Compartida\nConocimiento acumulado"]
LongTerm["Memoria a Largo Plazo\nVector DB + Retrieval"]
end
subgraph Coordination["Para coordinación flexible"]
Artifacts["Artefactos\nComunicación por archivos"]
Reactive["Agentes Reactivos\nEvent-driven"]
Federation["Federación\nCluster distribuido"]
HumanSuper["Supervisión Humana\nAprobación escalable"]
end
Guía de selección de patrón:
| Situación | Patrón recomendado |
|---|---|
| Documentación técnica que debe ser excelente | Critic-Actor Profundo con rúbrica |
| Código que debe cumplir políticas de seguridad | Constitutional AI |
| Decisiones de arquitectura con trade-offs | Debate |
| Múltiples enfoques, usar el más rápido | Speculative Execution |
| Archivos que se crean automáticamente | Agentes Reactivos |
| Proyecto con historia de decisiones | Memoria a Largo Plazo |
| Alta carga de trabajo de review | Federación de Agentes |
| Aprobación de acciones irreversibles | Supervisión Humana |
Próximo capítulo: Capítulo 20: SDK Internals y Extensiones — Cómo funciona el SDK por dentro y cómo extenderlo.