Capítulo 16: Integración con Frameworks Web

Por: Artiko
claudeagent-sdkfastapiexpressnextjshonointegracion

Capítulo 16: Integración con Frameworks Web

El Claude Agent SDK está diseñado para ejecutarse en entornos Python y Node.js, lo que lo hace compatible con prácticamente cualquier framework web moderno. Sin embargo, integrar un agente en una API web presenta desafíos únicos: los agentes pueden tardar segundos o minutos, mientras que una petición HTTP tradicional espera una respuesta rápida.

Este capítulo cubre los patrones de integración correctos para cada framework, con soluciones para el problema de la latencia.


1. Arquitectura de Integración

El problema fundamental: agentes lentos en HTTP

Una petición HTTP típica debe responder en menos de 30 segundos (límite de muchos proxies y load balancers). Un agente puede tardar varios minutos para tareas complejas. Esta incompatibilidad exige estrategias específicas.

graph TD
    subgraph Sync["Síncrono - Para tareas rápidas menos de 10s"]
        C1[Cliente HTTP] -->|"POST /agent/query"| A1[API]
        A1 -->|"Ejecuta agente"| SDK1[SDK]
        SDK1 -->|"Resultado"| A1
        A1 -->|"200 OK + resultado"| C1
    end

    subgraph SSE["SSE - Para tareas largas con progreso"]
        C2[Cliente HTTP] -->|"GET /agent/stream"| A2[API]
        A2 -->|"Inicia agente"| SDK2[SDK]
        SDK2 -->|"Evento 1: tool_use"| A2
        A2 -->|"data: {tool_use}"| C2
        SDK2 -->|"Evento 2: texto"| A2
        A2 -->|"data: {texto}"| C2
        SDK2 -->|"Resultado final"| A2
        A2 -->|"data: {done}"| C2
    end

    subgraph Queue["Queue - Para tareas en background"]
        C3[Cliente HTTP] -->|"POST /agent/job"| A3[API]
        A3 -->|"201 + job_id"| C3
        A3 -->|"Encolar tarea"| Q[Queue Redis/RabbitMQ]
        Q -->|"Consumir"| W[Worker]
        W -->|"Ejecutar agente"| SDK3[SDK]
        W -->|"POST job_id + result"| WH[Webhook del cliente]
    end

Mapeando sesiones de agente a sesiones web

El SDK crea una nueva “sesión” en cada llamada a query(). Para contexto persistente entre peticiones del mismo usuario, debes gestionar el historial de conversación explícitamente:

Python:

from claude_code_sdk import query, ClaudeCodeOptions
from typing import Optional
import json

class AgentSessionManager:
    """Gestiona sesiones de agente persistentes para usuarios web."""

    def __init__(self):
        # En producción, usar Redis para persistencia
        self._sessions: dict[str, dict] = {}

    def get_or_create(self, session_id: str) -> dict:
        if session_id not in self._sessions:
            self._sessions[session_id] = {
                "history": [],
                "workspace": f"/tmp/workspace-{session_id}",
                "created_at": __import__('time').time(),
            }
        return self._sessions[session_id]

    def add_message(self, session_id: str, role: str, content: str):
        session = self.get_or_create(session_id)
        session["history"].append({"role": role, "content": content})
        # Mantener solo los últimos 10 mensajes para evitar contextos enormes
        if len(session["history"]) > 10:
            session["history"] = session["history"][-10:]

    def get_context(self, session_id: str) -> str:
        """Genera un resumen del contexto previo para incluir en el prompt."""
        session = self.get_or_create(session_id)
        if not session["history"]:
            return ""

        lines = ["Contexto previo de esta sesión:"]
        for msg in session["history"][-5:]:  # Solo los últimos 5
            lines.append(f"[{msg['role']}]: {msg['content'][:100]}")
        return "\n".join(lines)

session_manager = AgentSessionManager()

TypeScript:

interface AgentSession {
  history: Array<{ role: string; content: string }>;
  workspace: string;
  createdAt: number;
}

class AgentSessionManager {
  private sessions: Map<string, AgentSession> = new Map();

  getOrCreate(sessionId: string): AgentSession {
    if (!this.sessions.has(sessionId)) {
      this.sessions.set(sessionId, {
        history: [],
        workspace: `/tmp/workspace-${sessionId}`,
        createdAt: Date.now(),
      });
    }
    return this.sessions.get(sessionId)!;
  }

  addMessage(sessionId: string, role: string, content: string): void {
    const session = this.getOrCreate(sessionId);
    session.history.push({ role, content });
    if (session.history.length > 10) {
      session.history = session.history.slice(-10);
    }
  }

  getContext(sessionId: string): string {
    const session = this.getOrCreate(sessionId);
    if (session.history.length === 0) return "";

    const lines = ["Contexto previo:"];
    for (const msg of session.history.slice(-5)) {
      lines.push(`[${msg.role}]: ${msg.content.slice(0, 100)}`);
    }
    return lines.join("\n");
  }
}

const sessionManager = new AgentSessionManager();

2. FastAPI (Python)

FastAPI es el framework Python más popular para APIs modernas. Su soporte nativo de async lo hace ideal para agentes.

Configuración base del proyecto

pip install fastapi uvicorn python-jose[cryptography] slowapi

Endpoint síncrono (para tareas cortas, menos de 30s)

# main.py
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio

app = FastAPI(title="Claude Agent API", version="1.0.0")

class AgentRequest(BaseModel):
    task: str
    session_id: str | None = None

class AgentResponse(BaseModel):
    result: str
    cost_usd: float
    turns: int

@app.post("/agent/query", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
    """
    Endpoint síncrono. El cliente espera hasta que el agente termina.
    Adecuado solo para tareas que tardan menos de 30 segundos.
    """
    try:
        options = ClaudeCodeOptions(
            allowed_tools=["View", "GlobTool", "GrepTool"],
            max_turns=10,
            model="claude-haiku-3-5",
        )

        result = ""
        total_cost = 0.0
        total_turns = 0

        async for message in query(prompt=request.task, options=options):
            if hasattr(message, 'cost_usd') and message.cost_usd:
                total_cost += message.cost_usd
            if hasattr(message, 'num_turns'):
                total_turns = message.num_turns
            if hasattr(message, 'result'):
                result = message.result

        return AgentResponse(
            result=result,
            cost_usd=total_cost,
            turns=total_turns
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Endpoint SSE para streaming

Server-Sent Events (SSE) permite enviar eventos del servidor al cliente en tiempo real sobre una conexión HTTP:

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from claude_code_sdk import query, ClaudeCodeOptions
import json
import asyncio

@app.get("/agent/stream")
async def stream_agent(task: str, request: Request):
    """
    Endpoint SSE. El cliente recibe eventos a medida que el agente trabaja.
    Ideal para tareas largas donde el usuario quiere ver progreso.
    """
    async def event_generator():
        try:
            options = ClaudeCodeOptions(
                allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
                max_turns=20,
                model="claude-sonnet-4-5",
            )

            async for message in query(prompt=task, options=options):
                # Verificar si el cliente desconectó
                if await request.is_disconnected():
                    break

                event_data = None

                if hasattr(message, 'type'):
                    if message.type == "assistant":
                        # Texto del asistente
                        content = getattr(message, 'message', None)
                        if content and hasattr(content, 'content'):
                            for block in content.content:
                                if hasattr(block, 'text') and block.text:
                                    event_data = {
                                        "type": "text",
                                        "content": block.text
                                    }

                    elif message.type == "tool_use":
                        # Notificar qué herramienta se usa
                        event_data = {
                            "type": "tool_use",
                            "tool": getattr(message, 'tool_name', 'unknown'),
                        }

                    elif message.type == "tool_result":
                        event_data = {
                            "type": "tool_result",
                            "tool": getattr(message, 'tool_name', 'unknown'),
                        }

                    elif message.type == "result":
                        event_data = {
                            "type": "done",
                            "result": message.result,
                            "cost_usd": getattr(message, 'cost_usd', 0),
                        }

                if event_data:
                    yield f"data: {json.dumps(event_data)}\n\n"

        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
        finally:
            yield "data: {\"type\": \"close\"}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Importante para nginx
            "Access-Control-Allow-Origin": "*",
        }
    )

Endpoint WebSocket bidireccional

WebSocket permite comunicación bidireccional: el cliente puede enviar nuevas instrucciones mientras el agente trabaja:

from fastapi import WebSocket, WebSocketDisconnect
from claude_code_sdk import query, ClaudeCodeOptions
import json
import asyncio

@app.websocket("/agent/ws/{session_id}")
async def websocket_agent(websocket: WebSocket, session_id: str):
    """
    WebSocket bidireccional.
    El cliente puede enviar mensajes mientras el agente trabaja.
    """
    await websocket.accept()

    try:
        while True:
            # Esperar mensaje del cliente
            raw_message = await websocket.receive_text()
            request = json.loads(raw_message)

            if request.get("type") == "task":
                task = request["task"]

                await websocket.send_json({
                    "type": "status",
                    "message": "Agente iniciado"
                })

                options = ClaudeCodeOptions(
                    allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
                    max_turns=20,
                )

                async for message in query(prompt=task, options=options):
                    if hasattr(message, 'type'):
                        if message.type == "assistant":
                            content = getattr(message, 'message', None)
                            if content and hasattr(content, 'content'):
                                for block in content.content:
                                    if hasattr(block, 'text') and block.text:
                                        await websocket.send_json({
                                            "type": "text",
                                            "content": block.text
                                        })

                        elif message.type == "result":
                            await websocket.send_json({
                                "type": "done",
                                "result": message.result,
                            })

            elif request.get("type") == "cancel":
                await websocket.send_json({"type": "cancelled"})
                break

    except WebSocketDisconnect:
        print(f"[WS] Cliente desconectado: {session_id}")
    except Exception as e:
        await websocket.send_json({"type": "error", "message": str(e)})

Background tasks con FastAPI

Para tareas que toman más de 30 segundos, usa background tasks:

from fastapi import BackgroundTasks
from fastapi import FastAPI
from pydantic import BaseModel
import uuid
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions

# Almacenamiento de jobs (en producción, usar Redis)
jobs: dict[str, dict] = {}

class JobRequest(BaseModel):
    task: str
    webhook_url: str | None = None

class JobStatus(BaseModel):
    job_id: str
    status: str  # "pending" | "running" | "completed" | "failed"
    result: str | None = None
    error: str | None = None

@app.post("/agent/jobs", status_code=201)
async def create_job(request: JobRequest, background_tasks: BackgroundTasks):
    """Crea un job en background. Retorna inmediatamente con job_id."""
    job_id = str(uuid.uuid4())
    jobs[job_id] = {"status": "pending", "result": None, "error": None}

    background_tasks.add_task(run_agent_job, job_id, request.task, request.webhook_url)

    return {"job_id": job_id, "status": "pending"}

@app.get("/agent/jobs/{job_id}", response_model=JobStatus)
async def get_job_status(job_id: str):
    """Consulta el estado de un job."""
    if job_id not in jobs:
        raise HTTPException(status_code=404, detail="Job no encontrado")
    job = jobs[job_id]
    return JobStatus(job_id=job_id, **job)

async def run_agent_job(job_id: str, task: str, webhook_url: str | None):
    """Función que corre en background."""
    import httpx

    jobs[job_id]["status"] = "running"

    try:
        options = ClaudeCodeOptions(
            allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
            max_turns=30,
            model="claude-sonnet-4-5",
        )

        result = ""
        async for message in query(prompt=task, options=options):
            if hasattr(message, 'result'):
                result = message.result

        jobs[job_id]["status"] = "completed"
        jobs[job_id]["result"] = result

        # Notificar via webhook si fue configurado
        if webhook_url:
            async with httpx.AsyncClient() as client:
                await client.post(webhook_url, json={
                    "job_id": job_id,
                    "status": "completed",
                    "result": result,
                })

    except Exception as e:
        jobs[job_id]["status"] = "failed"
        jobs[job_id]["error"] = str(e)

        if webhook_url:
            async with httpx.AsyncClient() as client:
                await client.post(webhook_url, json={
                    "job_id": job_id,
                    "status": "failed",
                    "error": str(e),
                })

Rate limiting con slowapi

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import Request

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/agent/query")
@limiter.limit("10/hour")  # 10 queries por hora por IP
async def rate_limited_query(request: Request, body: AgentRequest):
    # ... implementación del endpoint
    pass

Autenticación con JWT en FastAPI

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import os

security = HTTPBearer()

def verify_jwt(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
    """Dependencia de FastAPI para verificar JWT."""
    try:
        payload = jwt.decode(
            credentials.credentials,
            os.environ["JWT_SECRET"],
            algorithms=["HS256"]
        )
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expirado")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Token inválido")

@app.post("/agent/query")
@limiter.limit("20/hour")
async def authenticated_query(
    request: Request,
    body: AgentRequest,
    user: dict = Depends(verify_jwt)
):
    """Endpoint con autenticación JWT."""
    user_id = user["sub"]
    print(f"[AUDIT] Query de usuario: {user_id}")

    options = ClaudeCodeOptions(
        allowed_tools=["View", "GlobTool"],
        max_turns=10,
    )

    result = ""
    async for message in query(prompt=body.task, options=options):
        if hasattr(message, 'result'):
            result = message.result

    return {"result": result, "user": user_id}

API completa de agente con FastAPI

# api/main.py - Ejemplo completo
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from claude_code_sdk import query, ClaudeCodeOptions
import json, uuid, asyncio, os
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

app = FastAPI(title="Claude Agent API")
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

jobs: dict = {}

class QueryRequest(BaseModel):
    task: str
    model: str = "claude-haiku-3-5"
    max_turns: int = 10

@app.get("/health")
async def health():
    return {"status": "ok"}

@app.post("/agent/query")
@limiter.limit("20/hour")
async def sync_query(request: Request, body: QueryRequest):
    options = ClaudeCodeOptions(
        allowed_tools=["View", "GlobTool", "GrepTool"],
        max_turns=min(body.max_turns, 20),
        model=body.model,
    )
    result = ""
    async for message in query(prompt=body.task, options=options):
        if hasattr(message, 'result'):
            result = message.result
    return {"result": result}

@app.get("/agent/stream")
async def stream_query(task: str, request: Request):
    async def gen():
        options = ClaudeCodeOptions(
            allowed_tools=["View", "GlobTool", "GrepTool"],
            max_turns=20,
        )
        async for message in query(prompt=task, options=options):
            if await request.is_disconnected():
                break
            if hasattr(message, 'result'):
                yield f"data: {json.dumps({'type':'done','result':message.result})}\n\n"
    return StreamingResponse(gen(), media_type="text/event-stream")

@app.post("/agent/jobs", status_code=201)
async def create_job(body: QueryRequest, background_tasks: BackgroundTasks):
    job_id = str(uuid.uuid4())
    jobs[job_id] = {"status": "pending"}
    background_tasks.add_task(
        run_job, job_id, body.task, body.model, body.max_turns
    )
    return {"job_id": job_id}

@app.get("/agent/jobs/{job_id}")
async def get_job(job_id: str):
    if job_id not in jobs:
        raise HTTPException(404, "Job no encontrado")
    return jobs[job_id]

async def run_job(job_id: str, task: str, model: str, max_turns: int):
    jobs[job_id]["status"] = "running"
    try:
        options = ClaudeCodeOptions(
            allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
            max_turns=max_turns,
            model=model,
        )
        result = ""
        async for message in query(prompt=task, options=options):
            if hasattr(message, 'result'):
                result = message.result
        jobs[job_id] = {"status": "completed", "result": result}
    except Exception as e:
        jobs[job_id] = {"status": "failed", "error": str(e)}

3. Flask (Python)

Flask es más simple que FastAPI. Para agentes, necesitas Flask con soporte async:

pip install flask[async] flask-sse

Integración básica con Flask

# app.py
from flask import Flask, request, jsonify, Response, stream_with_context
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
import json

app = Flask(__name__)

def run_async(coro):
    """Ejecutar coroutine en Flask (que es síncrono por defecto)."""
    loop = asyncio.new_event_loop()
    try:
        return loop.run_until_complete(coro)
    finally:
        loop.close()

@app.route("/agent/query", methods=["POST"])
def agent_query():
    """Endpoint síncrono para Flask."""
    data = request.get_json()
    task = data.get("task", "")

    async def _run():
        options = ClaudeCodeOptions(
            allowed_tools=["View", "GlobTool"],
            max_turns=10,
            model="claude-haiku-3-5",
        )
        result = ""
        async for message in query(prompt=task, options=options):
            if hasattr(message, 'result'):
                result = message.result
        return result

    try:
        result = run_async(_run())
        return jsonify({"result": result})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

SSE con Flask

@app.route("/agent/stream")
def agent_stream():
    """SSE con Flask."""
    task = request.args.get("task", "")

    def generate():
        async def _run():
            options = ClaudeCodeOptions(
                allowed_tools=["View", "GlobTool"],
                max_turns=15,
            )
            async for message in query(prompt=task, options=options):
                if hasattr(message, 'type'):
                    if message.type == "result":
                        yield f"data: {json.dumps({'type':'done','result':message.result})}\n\n"

        # Ejecutar el async generator en un loop síncrono
        loop = asyncio.new_event_loop()
        try:
            agen = _run()
            while True:
                try:
                    event = loop.run_until_complete(agen.__anext__())
                    yield event
                except StopAsyncIteration:
                    break
        finally:
            loop.close()

    return Response(
        stream_with_context(generate()),
        mimetype="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )

Sesiones Flask mapeadas a sesiones de agente

from flask import Flask, session, request, jsonify
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio

app = Flask(__name__)
app.secret_key = "tu-secret-key-muy-segura"

# Historial de conversación por session_id (en producción: Redis)
conversation_histories: dict[str, list] = {}

@app.route("/agent/chat", methods=["POST"])
def chat():
    """Endpoint que mantiene contexto entre mensajes."""
    flask_session_id = session.get("session_id")
    if not flask_session_id:
        import uuid
        flask_session_id = str(uuid.uuid4())
        session["session_id"] = flask_session_id

    data = request.get_json()
    user_message = data.get("message", "")

    # Obtener historial previo
    history = conversation_histories.get(flask_session_id, [])

    # Construir prompt con contexto
    context = "\n".join([
        f"[{h['role']}]: {h['content'][:200]}"
        for h in history[-5:]  # Solo últimos 5 mensajes
    ])

    full_prompt = f"{context}\n\n[usuario]: {user_message}" if context else user_message

    async def _run():
        options = ClaudeCodeOptions(
            allowed_tools=["View", "GlobTool"],
            max_turns=10,
        )
        result = ""
        async for message in query(prompt=full_prompt, options=options):
            if hasattr(message, 'result'):
                result = message.result
        return result

    loop = asyncio.new_event_loop()
    try:
        result = loop.run_until_complete(_run())
    finally:
        loop.close()

    # Actualizar historial
    history.append({"role": "usuario", "content": user_message})
    history.append({"role": "agente", "content": result})
    conversation_histories[flask_session_id] = history[-20:]  # Mantener últimos 20

    return jsonify({
        "result": result,
        "session_id": flask_session_id,
    })

4. Express.js (TypeScript)

Express es el framework Node.js más utilizado. Con TypeScript y el SDK, la integración es directa.

Setup del proyecto

npm init -y
npm install express @anthropic-ai/claude-code-sdk
npm install -D typescript @types/express @types/node ts-node

Endpoint básico

// src/server.ts
import express, { Request, Response, NextFunction } from "express";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const app = express();
app.use(express.json());

interface QueryBody {
  task: string;
  model?: string;
}

app.post("/agent/query", async (req: Request<{}, {}, QueryBody>, res: Response) => {
  const { task, model = "claude-haiku-3-5" } = req.body;

  if (!task) {
    return res.status(400).json({ error: "task es requerido" });
  }

  try {
    const options: ClaudeCodeOptions = {
      allowedTools: ["View", "GlobTool", "GrepTool"],
      maxTurns: 10,
      model,
    };

    let result = "";
    for await (const message of query({ prompt: task, options })) {
      if (message.type === "result") {
        result = message.result;
      }
    }

    res.json({ result });
  } catch (error) {
    res.status(500).json({ error: (error as Error).message });
  }
});

app.listen(3000, () => console.log("Servidor en http://localhost:3000"));

SSE con Express

import express, { Request, Response } from "express";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

app.get("/agent/stream", async (req: Request, res: Response) => {
  const task = req.query.task as string;

  if (!task) {
    return res.status(400).json({ error: "task es requerido" });
  }

  // Configurar headers SSE
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("Access-Control-Allow-Origin", "*");
  res.flushHeaders();

  const sendEvent = (data: object) => {
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  };

  try {
    const options: ClaudeCodeOptions = {
      allowedTools: ["View", "GlobTool", "GrepTool", "Edit"],
      maxTurns: 20,
    };

    for await (const message of query({ prompt: task, options })) {
      // Verificar si el cliente se desconectó
      if (req.socket.destroyed) break;

      if (message.type === "assistant") {
        const content = (message as any).message?.content ?? [];
        for (const block of content) {
          if (block.type === "text" && block.text) {
            sendEvent({ type: "text", content: block.text });
          }
        }
      } else if (message.type === "result") {
        sendEvent({ type: "done", result: message.result });
      }
    }
  } catch (error) {
    sendEvent({ type: "error", message: (error as Error).message });
  } finally {
    res.end();
  }
});

WebSocket con la librería ws

import { WebSocketServer, WebSocket } from "ws";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import { createServer } from "http";
import express from "express";

const expressApp = express();
const server = createServer(expressApp);
const wss = new WebSocketServer({ server });

wss.on("connection", (ws: WebSocket, req) => {
  console.log(`[WS] Cliente conectado desde ${req.socket.remoteAddress}`);

  ws.on("message", async (rawData) => {
    let parsed: { type: string; task?: string };

    try {
      parsed = JSON.parse(rawData.toString());
    } catch {
      ws.send(JSON.stringify({ type: "error", message: "JSON inválido" }));
      return;
    }

    if (parsed.type === "task" && parsed.task) {
      const task = parsed.task;

      ws.send(JSON.stringify({ type: "status", message: "Iniciando agente..." }));

      try {
        const options: ClaudeCodeOptions = {
          allowedTools: ["View", "GlobTool", "GrepTool", "Edit"],
          maxTurns: 25,
        };

        for await (const message of query({ prompt: task, options })) {
          if (ws.readyState !== WebSocket.OPEN) break;

          if (message.type === "assistant") {
            const content = (message as any).message?.content ?? [];
            for (const block of content) {
              if (block.type === "text" && block.text) {
                ws.send(JSON.stringify({ type: "text", content: block.text }));
              }
            }
          } else if (message.type === "result") {
            ws.send(JSON.stringify({ type: "done", result: message.result }));
          }
        }
      } catch (error) {
        ws.send(JSON.stringify({ type: "error", message: (error as Error).message }));
      }
    }
  });

  ws.on("close", () => console.log("[WS] Cliente desconectado"));
});

server.listen(3000, () => console.log("Servidor en puerto 3000"));

Middleware de autenticación

import { Request, Response, NextFunction } from "express";
import jwt from "jsonwebtoken";

interface AuthenticatedRequest extends Request {
  user?: { sub: string; role: string };
}

function authMiddleware(req: AuthenticatedRequest, res: Response, next: NextFunction): void {
  const authHeader = req.headers.authorization;

  if (!authHeader?.startsWith("Bearer ")) {
    res.status(401).json({ error: "Token requerido" });
    return;
  }

  const token = authHeader.slice(7);

  try {
    const payload = jwt.verify(token, process.env.JWT_SECRET!) as {
      sub: string;
      role: string;
    };
    req.user = payload;
    next();
  } catch (error) {
    res.status(401).json({ error: "Token inválido o expirado" });
  }
}

// Aplicar a rutas protegidas
app.post("/agent/query", authMiddleware, async (req: AuthenticatedRequest, res: Response) => {
  console.log(`[AUDIT] Query de usuario: ${req.user?.sub}`);
  // ... implementación
});

Error handling middleware

import { Request, Response, NextFunction } from "express";

// Debe ir al final de todos los middlewares
app.use((error: Error, req: Request, res: Response, next: NextFunction) => {
  console.error(`[ERROR] ${req.method} ${req.path}:`, error.message);

  if (error.name === "ValidationError") {
    return res.status(400).json({ error: error.message });
  }

  if (error.name === "UnauthorizedError") {
    return res.status(401).json({ error: "No autorizado" });
  }

  // Error genérico: no exponer detalles internos
  res.status(500).json({
    error: "Error interno del servidor",
    requestId: req.headers["x-request-id"],
  });
});

5. Hono (TypeScript/Edge)

Hono es un framework ultraligero compatible con edge runtimes (Cloudflare Workers, Deno Deploy, Vercel Edge). Es significativamente más rápido que Express para APIs simples.

Por qué Hono para agentes

Setup básico de Hono

npm create hono@latest my-agent-api
# Seleccionar: nodejs (para servidor), cloudflare-workers (para edge)

SSE con Hono

// src/index.ts
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const app = new Hono();

app.get("/agent/stream", (c) => {
  const task = c.req.query("task");

  if (!task) {
    return c.json({ error: "task es requerido" }, 400);
  }

  return streamSSE(c, async (stream) => {
    try {
      const options: ClaudeCodeOptions = {
        allowedTools: ["View", "GlobTool", "GrepTool"],
        maxTurns: 15,
        model: "claude-sonnet-4-5",
      };

      for await (const message of query({ prompt: task, options })) {
        if (message.type === "assistant") {
          const content = (message as any).message?.content ?? [];
          for (const block of content) {
            if (block.type === "text" && block.text) {
              await stream.writeSSE({
                data: JSON.stringify({ type: "text", content: block.text }),
              });
            }
          }
        } else if (message.type === "result") {
          await stream.writeSSE({
            data: JSON.stringify({ type: "done", result: message.result }),
            event: "done",
          });
        }
      }
    } catch (error) {
      await stream.writeSSE({
        data: JSON.stringify({ type: "error", message: (error as Error).message }),
        event: "error",
      });
    }
  });
});

app.post("/agent/query", async (c) => {
  const { task, model = "claude-haiku-3-5" } = await c.req.json<{
    task: string;
    model?: string;
  }>();

  const options: ClaudeCodeOptions = {
    allowedTools: ["View", "GlobTool"],
    maxTurns: 10,
    model,
  };

  let result = "";
  for await (const message of query({ prompt: task, options })) {
    if (message.type === "result") result = message.result;
  }

  return c.json({ result });
});

export default app;

Deploy en Cloudflare Workers

En Cloudflare Workers no hay filesystem real. El agente puede procesar datos en memoria pero no puede leer archivos del disco. Casos de uso válidos:

// worker.ts - Para Cloudflare Workers
import { Hono } from "hono";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const app = new Hono<{ Bindings: { ANTHROPIC_API_KEY: string } }>();

app.post("/analyze", async (c) => {
  const { code, question } = await c.req.json<{
    code: string;
    question: string;
  }>();

  // En edge: los datos vienen en el request, no del filesystem
  const options: ClaudeCodeOptions = {
    allowedTools: [], // Sin herramientas de filesystem en edge
    maxTurns: 5,
    model: "claude-haiku-3-5",
  };

  const prompt = `
Analiza el siguiente código y responde: ${question}

CÓDIGO:
\`\`\`
${code}
\`\`\`
  `;

  let result = "";
  for await (const message of query({ prompt, options })) {
    if (message.type === "result") result = message.result;
  }

  return c.json({ result });
});

export default app;

Limitaciones en edge y cómo manejarlas

graph TD
    A[Request al Worker Edge] -->|"¿Necesita filesystem?"| B{Decisión}
    B -->|No - procesa datos en memoria| C[Hono en Edge]
    B -->|Sí - necesita leer archivos| D[Redirect a servidor Node.js]

    C -->|Resultado directo| E[Response al cliente]
    D -->|Proxy a servidor con filesystem| F[API en Node.js con filesystem]
    F -->|Resultado| E

6. Next.js (TypeScript)

Next.js 14+ con App Router ofrece soporte nativo para streaming y Server Actions que se integran perfectamente con el SDK.

API Routes con streaming response

// app/api/agent/route.ts
import { NextRequest, NextResponse } from "next/server";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

export const runtime = "nodejs"; // Importante: no edge, necesita Node.js

export async function POST(req: NextRequest): Promise<Response> {
  const { task } = await req.json();

  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    async start(controller) {
      try {
        const options: ClaudeCodeOptions = {
          allowedTools: ["View", "GlobTool", "GrepTool"],
          maxTurns: 15,
          model: "claude-sonnet-4-5",
        };

        for await (const message of query({ prompt: task, options })) {
          if (message.type === "assistant") {
            const content = (message as any).message?.content ?? [];
            for (const block of content) {
              if (block.type === "text" && block.text) {
                const event = `data: ${JSON.stringify({
                  type: "text",
                  content: block.text,
                })}\n\n`;
                controller.enqueue(encoder.encode(event));
              }
            }
          } else if (message.type === "result") {
            const event = `data: ${JSON.stringify({
              type: "done",
              result: message.result,
            })}\n\n`;
            controller.enqueue(encoder.encode(event));
          }
        }
      } catch (error) {
        const event = `data: ${JSON.stringify({
          type: "error",
          message: (error as Error).message,
        })}\n\n`;
        controller.enqueue(encoder.encode(event));
      } finally {
        controller.close();
      }
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

Server Actions con agente

// app/actions/agent.ts
"use server";

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

export async function runAgentAction(
  task: string
): Promise<{ result?: string; error?: string }> {
  "use server";

  try {
    const options: ClaudeCodeOptions = {
      allowedTools: ["View", "GlobTool"],
      maxTurns: 10,
      model: "claude-haiku-3-5",
    };

    let result = "";
    for await (const message of query({ prompt: task, options })) {
      if (message.type === "result") {
        result = message.result;
      }
    }

    return { result };
  } catch (error) {
    return { error: (error as Error).message };
  }
}

Componente React con streaming

// app/components/AgentChat.tsx
"use client";

import { useState, useRef } from "react";

interface Message {
  role: "user" | "agent";
  content: string;
}

export function AgentChat() {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState("");
  const [isLoading, setIsLoading] = useState(false);
  const [streamingContent, setStreamingContent] = useState("");
  const abortRef = useRef<AbortController | null>(null);

  const sendMessage = async () => {
    if (!input.trim() || isLoading) return;

    const userMessage = input.trim();
    setInput("");
    setIsLoading(true);
    setStreamingContent("");

    setMessages((prev) => [...prev, { role: "user", content: userMessage }]);

    abortRef.current = new AbortController();

    try {
      const response = await fetch("/api/agent", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ task: userMessage }),
        signal: abortRef.current.signal,
      });

      if (!response.body) throw new Error("No response body");

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let accumulated = "";

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split("\n");

        for (const line of lines) {
          if (!line.startsWith("data: ")) continue;
          const data = JSON.parse(line.slice(6));

          if (data.type === "text") {
            accumulated += data.content;
            setStreamingContent(accumulated);
          } else if (data.type === "done") {
            setMessages((prev) => [
              ...prev,
              { role: "agent", content: data.result },
            ]);
            setStreamingContent("");
          }
        }
      }
    } catch (error) {
      if ((error as Error).name !== "AbortError") {
        setMessages((prev) => [
          ...prev,
          { role: "agent", content: `Error: ${(error as Error).message}` },
        ]);
      }
    } finally {
      setIsLoading(false);
      setStreamingContent("");
    }
  };

  return (
    <div className="flex flex-col h-screen max-w-2xl mx-auto p-4">
      <div className="flex-1 overflow-y-auto space-y-4 mb-4">
        {messages.map((msg, i) => (
          <div
            key={i}
            className={`p-3 rounded-lg ${
              msg.role === "user"
                ? "bg-blue-100 ml-12"
                : "bg-gray-100 mr-12"
            }`}
          >
            <span className="font-bold text-xs text-gray-500">
              {msg.role === "user" ? "Tú" : "Agente"}
            </span>
            <p className="mt-1 whitespace-pre-wrap">{msg.content}</p>
          </div>
        ))}

        {streamingContent && (
          <div className="bg-gray-100 mr-12 p-3 rounded-lg">
            <span className="font-bold text-xs text-gray-500">Agente</span>
            <p className="mt-1 whitespace-pre-wrap">{streamingContent}</p>
            <span className="animate-pulse"></span>
          </div>
        )}
      </div>

      <div className="flex gap-2">
        <input
          value={input}
          onChange={(e) => setInput(e.target.value)}
          onKeyDown={(e) => e.key === "Enter" && !e.shiftKey && sendMessage()}
          placeholder="Escribe tu tarea..."
          className="flex-1 border rounded-lg px-3 py-2"
          disabled={isLoading}
        />
        <button
          onClick={sendMessage}
          disabled={isLoading || !input.trim()}
          className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"
        >
          {isLoading ? "..." : "Enviar"}
        </button>
        {isLoading && (
          <button
            onClick={() => abortRef.current?.abort()}
            className="bg-red-500 text-white px-4 py-2 rounded-lg"
          >
            Cancelar
          </button>
        )}
      </div>
    </div>
  );
}

App Router vs Pages Router

CaracterísticaApp Router (Next.js 13+)Pages Router (legacy)
StreamingNativo con ReadableStreamRequiere custom response
Server ActionsNativoNo disponible
RuntimeEdge o Node.js por archivoGlobal
RecomendaciónUsar para proyectos nuevosMantener en proyectos existentes

7. Queue-based Integration

Para tareas que pueden tardar minutos, la integración via queue es la más robusta.

Arquitectura con BullMQ (Node.js)

sequenceDiagram
    participant W as Web App
    participant API as API Server
    participant Q as Redis Queue
    participant WK as Worker
    participant SDK as Claude SDK
    participant WH as Webhook

    W->>API: POST /agent/jobs {task}
    API->>Q: bull.add(job)
    API-->>W: 201 {job_id}

    loop Polling opcional
        W->>API: GET /agent/jobs/{job_id}
        API-->>W: {status: "pending"}
    end

    Q->>WK: Consume job
    WK->>SDK: query(task)
    SDK-->>WK: Resultado
    WK->>API: Actualizar job status
    WK->>WH: POST resultado al webhook
    WH-->>W: Notificación de completado

Implementación con BullMQ

// queue/worker.ts
import { Worker, Queue, Job } from "bullmq";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import axios from "axios";

interface AgentJobData {
  jobId: string;
  task: string;
  model: string;
  webhookUrl?: string;
  userId: string;
}

// Conexión a Redis
const connection = { host: "localhost", port: 6379 };

// Cola de jobs
export const agentQueue = new Queue<AgentJobData>("agent-jobs", { connection });

// Worker que procesa los jobs
const worker = new Worker<AgentJobData>(
  "agent-jobs",
  async (job: Job<AgentJobData>) => {
    const { task, model, webhookUrl, userId, jobId } = job.data;

    console.log(`[WORKER] Procesando job ${jobId} para usuario ${userId}`);

    const options: ClaudeCodeOptions = {
      allowedTools: ["View", "GlobTool", "GrepTool", "Edit"],
      maxTurns: 30,
      model,
    };

    // Reportar progreso al job
    await job.updateProgress(10);

    let result = "";
    let turns = 0;

    for await (const message of query({ prompt: task, options })) {
      if (message.type === "result") {
        result = message.result;
      }
      // Actualizar progreso según turns
      turns++;
      const progress = Math.min(10 + (turns * 3), 90);
      await job.updateProgress(progress);
    }

    await job.updateProgress(100);

    // Notificar via webhook
    if (webhookUrl) {
      try {
        await axios.post(webhookUrl, {
          jobId,
          status: "completed",
          result,
          userId,
        });
      } catch (webhookError) {
        console.error(`[WORKER] Error en webhook: ${webhookError}`);
        // No fallar el job por error de webhook
      }
    }

    return { result, turns };
  },
  {
    connection,
    concurrency: 5, // Máximo 5 jobs en paralelo
    limiter: {
      max: 10,       // Máximo 10 jobs por...
      duration: 60000, // ...por minuto (rate limit)
    },
  }
);

worker.on("completed", (job) => {
  console.log(`[WORKER] Job ${job.id} completado`);
});

worker.on("failed", (job, error) => {
  console.error(`[WORKER] Job ${job?.id} fallido: ${error.message}`);
});

API que encola los jobs

// api/jobs.ts
import { Router, Request, Response } from "express";
import { agentQueue } from "../queue/worker";
import { v4 as uuidv4 } from "uuid";

const router = Router();
const jobStatuses: Map<string, object> = new Map();

router.post("/jobs", async (req: Request, res: Response) => {
  const { task, model = "claude-sonnet-4-5", webhookUrl } = req.body;
  const userId = (req as any).user?.sub ?? "anonymous";

  const jobId = uuidv4();

  const job = await agentQueue.add("agent-task", {
    jobId,
    task,
    model,
    webhookUrl,
    userId,
  }, {
    attempts: 3,           // Reintentar hasta 3 veces si falla
    backoff: {
      type: "exponential",
      delay: 5000,         // Espera exponencial entre reintentos
    },
    removeOnComplete: 100, // Mantener solo los últimos 100 jobs completados
    removeOnFail: 50,
  });

  res.status(201).json({
    jobId,
    queueJobId: job.id,
    status: "queued",
    estimatedWaitMs: await agentQueue.getWaitingCount() * 30000,
  });
});

router.get("/jobs/:jobId", async (req: Request, res: Response) => {
  const { jobId } = req.params;

  const jobs = await agentQueue.getJobs(["waiting", "active", "completed", "failed"]);
  const job = jobs.find((j) => j.data.jobId === jobId);

  if (!job) {
    return res.status(404).json({ error: "Job no encontrado" });
  }

  const state = await job.getState();
  const progress = job.progress;

  res.json({
    jobId,
    status: state,
    progress,
    result: state === "completed" ? job.returnvalue?.result : undefined,
    error: state === "failed" ? job.failedReason : undefined,
    createdAt: new Date(job.timestamp).toISOString(),
  });
});

export { router as jobsRouter };

Celery (Python) + Redis/RabbitMQ

# tasks.py
from celery import Celery
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
import httpx

celery_app = Celery(
    "agent_tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1"
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    result_expires=3600,
    task_track_started=True,
    worker_concurrency=5,
)

@celery_app.task(bind=True, max_retries=3)
def run_agent_task(self, task: str, model: str, webhook_url: str | None, user_id: str):
    """Tarea Celery que ejecuta el agente."""
    self.update_state(state="RUNNING", meta={"progress": 0})

    async def _run():
        options = ClaudeCodeOptions(
            allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
            max_turns=30,
            model=model,
        )

        result = ""
        async for message in query(prompt=task, options=options):
            if hasattr(message, 'result'):
                result = message.result

        return result

    try:
        loop = asyncio.new_event_loop()
        result = loop.run_until_complete(_run())
        loop.close()

        # Notificar via webhook
        if webhook_url:
            loop = asyncio.new_event_loop()
            async def _webhook():
                async with httpx.AsyncClient() as client:
                    await client.post(webhook_url, json={
                        "task_id": self.request.id,
                        "status": "completed",
                        "result": result,
                        "user_id": user_id,
                    })
            loop.run_until_complete(_webhook())
            loop.close()

        return {"status": "completed", "result": result}

    except Exception as exc:
        self.retry(exc=exc, countdown=2 ** self.request.retries * 5)

# API Flask que encola tareas
@app.route("/agent/jobs", methods=["POST"])
def create_celery_job():
    data = request.get_json()
    task = run_agent_task.delay(
        task=data["task"],
        model=data.get("model", "claude-sonnet-4-5"),
        webhook_url=data.get("webhook_url"),
        user_id=data.get("user_id", "anonymous"),
    )
    return jsonify({"task_id": task.id, "status": "queued"}), 201

@app.route("/agent/jobs/<task_id>")
def get_celery_job(task_id: str):
    result = celery_app.AsyncResult(task_id)
    return jsonify({
        "task_id": task_id,
        "status": result.state,
        "result": result.result if result.ready() else None,
    })

8. GraphQL

GraphQL ofrece una API más expresiva para sistemas complejos donde los clientes necesitan control sobre qué datos reciben.

Resolver que llama al agente (strawberry - Python)

# graphql_schema.py
import strawberry
from strawberry.types import Info
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio

@strawberry.type
class AgentResult:
    result: str
    cost_usd: float
    turns: int

@strawberry.type
class AgentJob:
    job_id: str
    status: str
    result: str | None = None
    error: str | None = None

@strawberry.type
class Query:
    @strawberry.field
    async def run_agent(self, task: str, model: str = "claude-haiku-3-5") -> AgentResult:
        """Resolver síncrono que ejecuta el agente."""
        options = ClaudeCodeOptions(
            allowed_tools=["View", "GlobTool"],
            max_turns=10,
            model=model,
        )

        result = ""
        total_cost = 0.0
        total_turns = 0

        async for message in query(prompt=task, options=options):
            if hasattr(message, 'cost_usd') and message.cost_usd:
                total_cost += message.cost_usd
            if hasattr(message, 'num_turns'):
                total_turns = message.num_turns
            if hasattr(message, 'result'):
                result = message.result

        return AgentResult(result=result, cost_usd=total_cost, turns=total_turns)

@strawberry.type
class Mutation:
    @strawberry.mutation
    async def create_agent_job(
        self,
        task: str,
        model: str = "claude-sonnet-4-5",
        webhook_url: str | None = None
    ) -> AgentJob:
        """Crear un job async."""
        import uuid
        job_id = str(uuid.uuid4())

        # Lanzar en background
        asyncio.create_task(
            _run_job_background(job_id, task, model, webhook_url)
        )

        return AgentJob(job_id=job_id, status="pending")

async def _run_job_background(job_id: str, task: str, model: str, webhook_url: str | None):
    """Ejecuta el job en background."""
    options = ClaudeCodeOptions(
        allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
        max_turns=25,
        model=model,
    )
    result = ""
    async for message in query(prompt=task, options=options):
        if hasattr(message, 'result'):
            result = message.result
    # Actualizar estado del job en DB...

schema = strawberry.Schema(query=Query, mutation=Mutation)

# Integración con FastAPI
from strawberry.fastapi import GraphQLRouter

graphql_app = GraphQLRouter(schema)
app.include_router(graphql_app, prefix="/graphql")

GraphQL con graphql-yoga (TypeScript)

// graphql/schema.ts
import { createSchema, createYoga } from "graphql-yoga";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const schema = createSchema({
  typeDefs: /* GraphQL */ `
    type AgentResult {
      result: String!
      turns: Int!
    }

    type AgentJob {
      jobId: String!
      status: String!
      result: String
    }

    type Query {
      runAgent(task: String!, model: String): AgentResult!
    }

    type Mutation {
      createAgentJob(task: String!, model: String, webhookUrl: String): AgentJob!
    }

    type Subscription {
      agentStream(task: String!): AgentEvent!
    }

    type AgentEvent {
      type: String!
      content: String
      result: String
    }
  `,

  resolvers: {
    Query: {
      runAgent: async (_: unknown, { task, model = "claude-haiku-3-5" }: { task: string; model?: string }) => {
        const options: ClaudeCodeOptions = {
          allowedTools: ["View", "GlobTool"],
          maxTurns: 10,
          model,
        };

        let result = "";
        let turns = 0;

        for await (const message of query({ prompt: task, options })) {
          if (message.type === "result") {
            result = message.result;
          }
        }

        return { result, turns };
      },
    },

    Subscription: {
      agentStream: {
        subscribe: async function* (_: unknown, { task }: { task: string }) {
          const options: ClaudeCodeOptions = {
            allowedTools: ["View", "GlobTool"],
            maxTurns: 15,
          };

          for await (const message of query({ prompt: task, options })) {
            if (message.type === "assistant") {
              const content = (message as any).message?.content ?? [];
              for (const block of content) {
                if (block.type === "text" && block.text) {
                  yield {
                    agentStream: {
                      type: "text",
                      content: block.text,
                    },
                  };
                }
              }
            } else if (message.type === "result") {
              yield {
                agentStream: {
                  type: "done",
                  result: message.result,
                },
              };
            }
          }
        },
      },
    },
  },
});

export const yoga = createYoga({ schema });

Cliente GraphQL para consumir subscriptions

// client/agent-client.ts
import { createClient } from "graphql-ws";
import WebSocket from "ws";

const client = createClient({
  url: "ws://localhost:4000/graphql",
  webSocketImpl: WebSocket,
});

async function* streamAgentViaGraphQL(task: string) {
  const subscription = client.iterate<{
    agentStream: { type: string; content?: string; result?: string };
  }>({
    query: /* GraphQL */ `
      subscription AgentStream($task: String!) {
        agentStream(task: $task) {
          type
          content
          result
        }
      }
    `,
    variables: { task },
  });

  for await (const event of subscription) {
    if (event.data) {
      yield event.data.agentStream;
    }
  }
}

// Uso
async function main() {
  const stream = streamAgentViaGraphQL("Analiza mi código Python");

  for await (const event of stream) {
    if (event.type === "text" && event.content) {
      process.stdout.write(event.content);
    } else if (event.type === "done") {
      console.log("\n\n=== Completado ===");
    }
  }
}

main();

Resumen: Elegir el patrón correcto

flowchart TD
    A[¿Cuánto tarda el agente?] -->|"Menos de 10s"| B[Endpoint síncrono]
    A -->|"10s a 5 minutos"| C[SSE o WebSocket]
    A -->|"Más de 5 minutos"| D[Queue con worker]

    B -->|"¿Necesitas streaming?"| E{Streaming}
    E -->|No| F[POST → JSON response]
    E -->|Sí| G[GET → SSE]

    C -->|"¿Bidireccional?"| H{Bidireccional}
    H -->|No| I[SSE: cliente ← servidor]
    H -->|Sí| J[WebSocket: cliente ↔ servidor]

    D -->|"¿Necesitas notificación?"| K{Notificación}
    K -->|Polling| L[GET /jobs/id]
    K -->|Push| M[Webhook al completar]
PatrónFrameworkCaso de uso
SíncronoFastAPI, Flask, Express, HonoTareas cortas (< 10s)
SSEFastAPI, Express, Next.jsTareas largas con progreso
WebSocketExpress + wsConversación bidireccional
Background TaskFastAPI BackgroundTasksTareas de hasta 5 min
Queue (BullMQ)Express + RedisTareas largas, alta concurrencia
Queue (Celery)Flask/FastAPI + RedisTareas largas en Python
GraphQL Subscriptiongraphql-yogaCuando ya tienes GraphQL

Con estos patrones tienes todas las herramientas para integrar el Claude Agent SDK en cualquier arquitectura de aplicación web moderna. El capítulo siguiente cubre patrones avanzados de multi-agente y orquestación.