Capítulo 16: Integración con Frameworks Web
Capítulo 16: Integración con Frameworks Web
El Claude Agent SDK está diseñado para ejecutarse en entornos Python y Node.js, lo que lo hace compatible con prácticamente cualquier framework web moderno. Sin embargo, integrar un agente en una API web presenta desafíos únicos: los agentes pueden tardar segundos o minutos, mientras que una petición HTTP tradicional espera una respuesta rápida.
Este capítulo cubre los patrones de integración correctos para cada framework, con soluciones para el problema de la latencia.
1. Arquitectura de Integración
El problema fundamental: agentes lentos en HTTP
Una petición HTTP típica debe responder en menos de 30 segundos (límite de muchos proxies y load balancers). Un agente puede tardar varios minutos para tareas complejas. Esta incompatibilidad exige estrategias específicas.
graph TD
subgraph Sync["Síncrono - Para tareas rápidas menos de 10s"]
C1[Cliente HTTP] -->|"POST /agent/query"| A1[API]
A1 -->|"Ejecuta agente"| SDK1[SDK]
SDK1 -->|"Resultado"| A1
A1 -->|"200 OK + resultado"| C1
end
subgraph SSE["SSE - Para tareas largas con progreso"]
C2[Cliente HTTP] -->|"GET /agent/stream"| A2[API]
A2 -->|"Inicia agente"| SDK2[SDK]
SDK2 -->|"Evento 1: tool_use"| A2
A2 -->|"data: {tool_use}"| C2
SDK2 -->|"Evento 2: texto"| A2
A2 -->|"data: {texto}"| C2
SDK2 -->|"Resultado final"| A2
A2 -->|"data: {done}"| C2
end
subgraph Queue["Queue - Para tareas en background"]
C3[Cliente HTTP] -->|"POST /agent/job"| A3[API]
A3 -->|"201 + job_id"| C3
A3 -->|"Encolar tarea"| Q[Queue Redis/RabbitMQ]
Q -->|"Consumir"| W[Worker]
W -->|"Ejecutar agente"| SDK3[SDK]
W -->|"POST job_id + result"| WH[Webhook del cliente]
end
Mapeando sesiones de agente a sesiones web
El SDK crea una nueva “sesión” en cada llamada a query(). Para contexto persistente entre peticiones del mismo usuario, debes gestionar el historial de conversación explícitamente:
Python:
from claude_code_sdk import query, ClaudeCodeOptions
from typing import Optional
import json
class AgentSessionManager:
"""Gestiona sesiones de agente persistentes para usuarios web."""
def __init__(self):
# En producción, usar Redis para persistencia
self._sessions: dict[str, dict] = {}
def get_or_create(self, session_id: str) -> dict:
if session_id not in self._sessions:
self._sessions[session_id] = {
"history": [],
"workspace": f"/tmp/workspace-{session_id}",
"created_at": __import__('time').time(),
}
return self._sessions[session_id]
def add_message(self, session_id: str, role: str, content: str):
session = self.get_or_create(session_id)
session["history"].append({"role": role, "content": content})
# Mantener solo los últimos 10 mensajes para evitar contextos enormes
if len(session["history"]) > 10:
session["history"] = session["history"][-10:]
def get_context(self, session_id: str) -> str:
"""Genera un resumen del contexto previo para incluir en el prompt."""
session = self.get_or_create(session_id)
if not session["history"]:
return ""
lines = ["Contexto previo de esta sesión:"]
for msg in session["history"][-5:]: # Solo los últimos 5
lines.append(f"[{msg['role']}]: {msg['content'][:100]}")
return "\n".join(lines)
session_manager = AgentSessionManager()
TypeScript:
interface AgentSession {
history: Array<{ role: string; content: string }>;
workspace: string;
createdAt: number;
}
class AgentSessionManager {
private sessions: Map<string, AgentSession> = new Map();
getOrCreate(sessionId: string): AgentSession {
if (!this.sessions.has(sessionId)) {
this.sessions.set(sessionId, {
history: [],
workspace: `/tmp/workspace-${sessionId}`,
createdAt: Date.now(),
});
}
return this.sessions.get(sessionId)!;
}
addMessage(sessionId: string, role: string, content: string): void {
const session = this.getOrCreate(sessionId);
session.history.push({ role, content });
if (session.history.length > 10) {
session.history = session.history.slice(-10);
}
}
getContext(sessionId: string): string {
const session = this.getOrCreate(sessionId);
if (session.history.length === 0) return "";
const lines = ["Contexto previo:"];
for (const msg of session.history.slice(-5)) {
lines.push(`[${msg.role}]: ${msg.content.slice(0, 100)}`);
}
return lines.join("\n");
}
}
const sessionManager = new AgentSessionManager();
2. FastAPI (Python)
FastAPI es el framework Python más popular para APIs modernas. Su soporte nativo de async lo hace ideal para agentes.
Configuración base del proyecto
pip install fastapi uvicorn python-jose[cryptography] slowapi
Endpoint síncrono (para tareas cortas, menos de 30s)
# main.py
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
app = FastAPI(title="Claude Agent API", version="1.0.0")
class AgentRequest(BaseModel):
task: str
session_id: str | None = None
class AgentResponse(BaseModel):
result: str
cost_usd: float
turns: int
@app.post("/agent/query", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
"""
Endpoint síncrono. El cliente espera hasta que el agente termina.
Adecuado solo para tareas que tardan menos de 30 segundos.
"""
try:
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool"],
max_turns=10,
model="claude-haiku-3-5",
)
result = ""
total_cost = 0.0
total_turns = 0
async for message in query(prompt=request.task, options=options):
if hasattr(message, 'cost_usd') and message.cost_usd:
total_cost += message.cost_usd
if hasattr(message, 'num_turns'):
total_turns = message.num_turns
if hasattr(message, 'result'):
result = message.result
return AgentResponse(
result=result,
cost_usd=total_cost,
turns=total_turns
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Endpoint SSE para streaming
Server-Sent Events (SSE) permite enviar eventos del servidor al cliente en tiempo real sobre una conexión HTTP:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from claude_code_sdk import query, ClaudeCodeOptions
import json
import asyncio
@app.get("/agent/stream")
async def stream_agent(task: str, request: Request):
"""
Endpoint SSE. El cliente recibe eventos a medida que el agente trabaja.
Ideal para tareas largas donde el usuario quiere ver progreso.
"""
async def event_generator():
try:
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
max_turns=20,
model="claude-sonnet-4-5",
)
async for message in query(prompt=task, options=options):
# Verificar si el cliente desconectó
if await request.is_disconnected():
break
event_data = None
if hasattr(message, 'type'):
if message.type == "assistant":
# Texto del asistente
content = getattr(message, 'message', None)
if content and hasattr(content, 'content'):
for block in content.content:
if hasattr(block, 'text') and block.text:
event_data = {
"type": "text",
"content": block.text
}
elif message.type == "tool_use":
# Notificar qué herramienta se usa
event_data = {
"type": "tool_use",
"tool": getattr(message, 'tool_name', 'unknown'),
}
elif message.type == "tool_result":
event_data = {
"type": "tool_result",
"tool": getattr(message, 'tool_name', 'unknown'),
}
elif message.type == "result":
event_data = {
"type": "done",
"result": message.result,
"cost_usd": getattr(message, 'cost_usd', 0),
}
if event_data:
yield f"data: {json.dumps(event_data)}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
finally:
yield "data: {\"type\": \"close\"}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Importante para nginx
"Access-Control-Allow-Origin": "*",
}
)
Endpoint WebSocket bidireccional
WebSocket permite comunicación bidireccional: el cliente puede enviar nuevas instrucciones mientras el agente trabaja:
from fastapi import WebSocket, WebSocketDisconnect
from claude_code_sdk import query, ClaudeCodeOptions
import json
import asyncio
@app.websocket("/agent/ws/{session_id}")
async def websocket_agent(websocket: WebSocket, session_id: str):
"""
WebSocket bidireccional.
El cliente puede enviar mensajes mientras el agente trabaja.
"""
await websocket.accept()
try:
while True:
# Esperar mensaje del cliente
raw_message = await websocket.receive_text()
request = json.loads(raw_message)
if request.get("type") == "task":
task = request["task"]
await websocket.send_json({
"type": "status",
"message": "Agente iniciado"
})
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
max_turns=20,
)
async for message in query(prompt=task, options=options):
if hasattr(message, 'type'):
if message.type == "assistant":
content = getattr(message, 'message', None)
if content and hasattr(content, 'content'):
for block in content.content:
if hasattr(block, 'text') and block.text:
await websocket.send_json({
"type": "text",
"content": block.text
})
elif message.type == "result":
await websocket.send_json({
"type": "done",
"result": message.result,
})
elif request.get("type") == "cancel":
await websocket.send_json({"type": "cancelled"})
break
except WebSocketDisconnect:
print(f"[WS] Cliente desconectado: {session_id}")
except Exception as e:
await websocket.send_json({"type": "error", "message": str(e)})
Background tasks con FastAPI
Para tareas que toman más de 30 segundos, usa background tasks:
from fastapi import BackgroundTasks
from fastapi import FastAPI
from pydantic import BaseModel
import uuid
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
# Almacenamiento de jobs (en producción, usar Redis)
jobs: dict[str, dict] = {}
class JobRequest(BaseModel):
task: str
webhook_url: str | None = None
class JobStatus(BaseModel):
job_id: str
status: str # "pending" | "running" | "completed" | "failed"
result: str | None = None
error: str | None = None
@app.post("/agent/jobs", status_code=201)
async def create_job(request: JobRequest, background_tasks: BackgroundTasks):
"""Crea un job en background. Retorna inmediatamente con job_id."""
job_id = str(uuid.uuid4())
jobs[job_id] = {"status": "pending", "result": None, "error": None}
background_tasks.add_task(run_agent_job, job_id, request.task, request.webhook_url)
return {"job_id": job_id, "status": "pending"}
@app.get("/agent/jobs/{job_id}", response_model=JobStatus)
async def get_job_status(job_id: str):
"""Consulta el estado de un job."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job no encontrado")
job = jobs[job_id]
return JobStatus(job_id=job_id, **job)
async def run_agent_job(job_id: str, task: str, webhook_url: str | None):
"""Función que corre en background."""
import httpx
jobs[job_id]["status"] = "running"
try:
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
max_turns=30,
model="claude-sonnet-4-5",
)
result = ""
async for message in query(prompt=task, options=options):
if hasattr(message, 'result'):
result = message.result
jobs[job_id]["status"] = "completed"
jobs[job_id]["result"] = result
# Notificar via webhook si fue configurado
if webhook_url:
async with httpx.AsyncClient() as client:
await client.post(webhook_url, json={
"job_id": job_id,
"status": "completed",
"result": result,
})
except Exception as e:
jobs[job_id]["status"] = "failed"
jobs[job_id]["error"] = str(e)
if webhook_url:
async with httpx.AsyncClient() as client:
await client.post(webhook_url, json={
"job_id": job_id,
"status": "failed",
"error": str(e),
})
Rate limiting con slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import Request
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/agent/query")
@limiter.limit("10/hour") # 10 queries por hora por IP
async def rate_limited_query(request: Request, body: AgentRequest):
# ... implementación del endpoint
pass
Autenticación con JWT en FastAPI
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import os
security = HTTPBearer()
def verify_jwt(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
"""Dependencia de FastAPI para verificar JWT."""
try:
payload = jwt.decode(
credentials.credentials,
os.environ["JWT_SECRET"],
algorithms=["HS256"]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expirado")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Token inválido")
@app.post("/agent/query")
@limiter.limit("20/hour")
async def authenticated_query(
request: Request,
body: AgentRequest,
user: dict = Depends(verify_jwt)
):
"""Endpoint con autenticación JWT."""
user_id = user["sub"]
print(f"[AUDIT] Query de usuario: {user_id}")
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool"],
max_turns=10,
)
result = ""
async for message in query(prompt=body.task, options=options):
if hasattr(message, 'result'):
result = message.result
return {"result": result, "user": user_id}
API completa de agente con FastAPI
# api/main.py - Ejemplo completo
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from claude_code_sdk import query, ClaudeCodeOptions
import json, uuid, asyncio, os
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
app = FastAPI(title="Claude Agent API")
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
jobs: dict = {}
class QueryRequest(BaseModel):
task: str
model: str = "claude-haiku-3-5"
max_turns: int = 10
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/agent/query")
@limiter.limit("20/hour")
async def sync_query(request: Request, body: QueryRequest):
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool"],
max_turns=min(body.max_turns, 20),
model=body.model,
)
result = ""
async for message in query(prompt=body.task, options=options):
if hasattr(message, 'result'):
result = message.result
return {"result": result}
@app.get("/agent/stream")
async def stream_query(task: str, request: Request):
async def gen():
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool"],
max_turns=20,
)
async for message in query(prompt=task, options=options):
if await request.is_disconnected():
break
if hasattr(message, 'result'):
yield f"data: {json.dumps({'type':'done','result':message.result})}\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
@app.post("/agent/jobs", status_code=201)
async def create_job(body: QueryRequest, background_tasks: BackgroundTasks):
job_id = str(uuid.uuid4())
jobs[job_id] = {"status": "pending"}
background_tasks.add_task(
run_job, job_id, body.task, body.model, body.max_turns
)
return {"job_id": job_id}
@app.get("/agent/jobs/{job_id}")
async def get_job(job_id: str):
if job_id not in jobs:
raise HTTPException(404, "Job no encontrado")
return jobs[job_id]
async def run_job(job_id: str, task: str, model: str, max_turns: int):
jobs[job_id]["status"] = "running"
try:
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
max_turns=max_turns,
model=model,
)
result = ""
async for message in query(prompt=task, options=options):
if hasattr(message, 'result'):
result = message.result
jobs[job_id] = {"status": "completed", "result": result}
except Exception as e:
jobs[job_id] = {"status": "failed", "error": str(e)}
3. Flask (Python)
Flask es más simple que FastAPI. Para agentes, necesitas Flask con soporte async:
pip install flask[async] flask-sse
Integración básica con Flask
# app.py
from flask import Flask, request, jsonify, Response, stream_with_context
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
import json
app = Flask(__name__)
def run_async(coro):
"""Ejecutar coroutine en Flask (que es síncrono por defecto)."""
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
@app.route("/agent/query", methods=["POST"])
def agent_query():
"""Endpoint síncrono para Flask."""
data = request.get_json()
task = data.get("task", "")
async def _run():
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool"],
max_turns=10,
model="claude-haiku-3-5",
)
result = ""
async for message in query(prompt=task, options=options):
if hasattr(message, 'result'):
result = message.result
return result
try:
result = run_async(_run())
return jsonify({"result": result})
except Exception as e:
return jsonify({"error": str(e)}), 500
SSE con Flask
@app.route("/agent/stream")
def agent_stream():
"""SSE con Flask."""
task = request.args.get("task", "")
def generate():
async def _run():
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool"],
max_turns=15,
)
async for message in query(prompt=task, options=options):
if hasattr(message, 'type'):
if message.type == "result":
yield f"data: {json.dumps({'type':'done','result':message.result})}\n\n"
# Ejecutar el async generator en un loop síncrono
loop = asyncio.new_event_loop()
try:
agen = _run()
while True:
try:
event = loop.run_until_complete(agen.__anext__())
yield event
except StopAsyncIteration:
break
finally:
loop.close()
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
Sesiones Flask mapeadas a sesiones de agente
from flask import Flask, session, request, jsonify
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
app = Flask(__name__)
app.secret_key = "tu-secret-key-muy-segura"
# Historial de conversación por session_id (en producción: Redis)
conversation_histories: dict[str, list] = {}
@app.route("/agent/chat", methods=["POST"])
def chat():
"""Endpoint que mantiene contexto entre mensajes."""
flask_session_id = session.get("session_id")
if not flask_session_id:
import uuid
flask_session_id = str(uuid.uuid4())
session["session_id"] = flask_session_id
data = request.get_json()
user_message = data.get("message", "")
# Obtener historial previo
history = conversation_histories.get(flask_session_id, [])
# Construir prompt con contexto
context = "\n".join([
f"[{h['role']}]: {h['content'][:200]}"
for h in history[-5:] # Solo últimos 5 mensajes
])
full_prompt = f"{context}\n\n[usuario]: {user_message}" if context else user_message
async def _run():
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool"],
max_turns=10,
)
result = ""
async for message in query(prompt=full_prompt, options=options):
if hasattr(message, 'result'):
result = message.result
return result
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(_run())
finally:
loop.close()
# Actualizar historial
history.append({"role": "usuario", "content": user_message})
history.append({"role": "agente", "content": result})
conversation_histories[flask_session_id] = history[-20:] # Mantener últimos 20
return jsonify({
"result": result,
"session_id": flask_session_id,
})
4. Express.js (TypeScript)
Express es el framework Node.js más utilizado. Con TypeScript y el SDK, la integración es directa.
Setup del proyecto
npm init -y
npm install express @anthropic-ai/claude-code-sdk
npm install -D typescript @types/express @types/node ts-node
Endpoint básico
// src/server.ts
import express, { Request, Response, NextFunction } from "express";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const app = express();
app.use(express.json());
interface QueryBody {
task: string;
model?: string;
}
app.post("/agent/query", async (req: Request<{}, {}, QueryBody>, res: Response) => {
const { task, model = "claude-haiku-3-5" } = req.body;
if (!task) {
return res.status(400).json({ error: "task es requerido" });
}
try {
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool", "GrepTool"],
maxTurns: 10,
model,
};
let result = "";
for await (const message of query({ prompt: task, options })) {
if (message.type === "result") {
result = message.result;
}
}
res.json({ result });
} catch (error) {
res.status(500).json({ error: (error as Error).message });
}
});
app.listen(3000, () => console.log("Servidor en http://localhost:3000"));
SSE con Express
import express, { Request, Response } from "express";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
app.get("/agent/stream", async (req: Request, res: Response) => {
const task = req.query.task as string;
if (!task) {
return res.status(400).json({ error: "task es requerido" });
}
// Configurar headers SSE
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("Access-Control-Allow-Origin", "*");
res.flushHeaders();
const sendEvent = (data: object) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
try {
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool", "GrepTool", "Edit"],
maxTurns: 20,
};
for await (const message of query({ prompt: task, options })) {
// Verificar si el cliente se desconectó
if (req.socket.destroyed) break;
if (message.type === "assistant") {
const content = (message as any).message?.content ?? [];
for (const block of content) {
if (block.type === "text" && block.text) {
sendEvent({ type: "text", content: block.text });
}
}
} else if (message.type === "result") {
sendEvent({ type: "done", result: message.result });
}
}
} catch (error) {
sendEvent({ type: "error", message: (error as Error).message });
} finally {
res.end();
}
});
WebSocket con la librería ws
import { WebSocketServer, WebSocket } from "ws";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import { createServer } from "http";
import express from "express";
const expressApp = express();
const server = createServer(expressApp);
const wss = new WebSocketServer({ server });
wss.on("connection", (ws: WebSocket, req) => {
console.log(`[WS] Cliente conectado desde ${req.socket.remoteAddress}`);
ws.on("message", async (rawData) => {
let parsed: { type: string; task?: string };
try {
parsed = JSON.parse(rawData.toString());
} catch {
ws.send(JSON.stringify({ type: "error", message: "JSON inválido" }));
return;
}
if (parsed.type === "task" && parsed.task) {
const task = parsed.task;
ws.send(JSON.stringify({ type: "status", message: "Iniciando agente..." }));
try {
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool", "GrepTool", "Edit"],
maxTurns: 25,
};
for await (const message of query({ prompt: task, options })) {
if (ws.readyState !== WebSocket.OPEN) break;
if (message.type === "assistant") {
const content = (message as any).message?.content ?? [];
for (const block of content) {
if (block.type === "text" && block.text) {
ws.send(JSON.stringify({ type: "text", content: block.text }));
}
}
} else if (message.type === "result") {
ws.send(JSON.stringify({ type: "done", result: message.result }));
}
}
} catch (error) {
ws.send(JSON.stringify({ type: "error", message: (error as Error).message }));
}
}
});
ws.on("close", () => console.log("[WS] Cliente desconectado"));
});
server.listen(3000, () => console.log("Servidor en puerto 3000"));
Middleware de autenticación
import { Request, Response, NextFunction } from "express";
import jwt from "jsonwebtoken";
interface AuthenticatedRequest extends Request {
user?: { sub: string; role: string };
}
function authMiddleware(req: AuthenticatedRequest, res: Response, next: NextFunction): void {
const authHeader = req.headers.authorization;
if (!authHeader?.startsWith("Bearer ")) {
res.status(401).json({ error: "Token requerido" });
return;
}
const token = authHeader.slice(7);
try {
const payload = jwt.verify(token, process.env.JWT_SECRET!) as {
sub: string;
role: string;
};
req.user = payload;
next();
} catch (error) {
res.status(401).json({ error: "Token inválido o expirado" });
}
}
// Aplicar a rutas protegidas
app.post("/agent/query", authMiddleware, async (req: AuthenticatedRequest, res: Response) => {
console.log(`[AUDIT] Query de usuario: ${req.user?.sub}`);
// ... implementación
});
Error handling middleware
import { Request, Response, NextFunction } from "express";
// Debe ir al final de todos los middlewares
app.use((error: Error, req: Request, res: Response, next: NextFunction) => {
console.error(`[ERROR] ${req.method} ${req.path}:`, error.message);
if (error.name === "ValidationError") {
return res.status(400).json({ error: error.message });
}
if (error.name === "UnauthorizedError") {
return res.status(401).json({ error: "No autorizado" });
}
// Error genérico: no exponer detalles internos
res.status(500).json({
error: "Error interno del servidor",
requestId: req.headers["x-request-id"],
});
});
5. Hono (TypeScript/Edge)
Hono es un framework ultraligero compatible con edge runtimes (Cloudflare Workers, Deno Deploy, Vercel Edge). Es significativamente más rápido que Express para APIs simples.
Por qué Hono para agentes
- Velocidad: Hasta 10x más rápido que Express en benchmarks sintéticos
- Edge-compatible: Corre en Cloudflare Workers, Deno, Bun
- TypeScript nativo: Sin necesidad de tipos adicionales
- Bundle pequeño: Menos de 15KB
Setup básico de Hono
npm create hono@latest my-agent-api
# Seleccionar: nodejs (para servidor), cloudflare-workers (para edge)
SSE con Hono
// src/index.ts
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const app = new Hono();
app.get("/agent/stream", (c) => {
const task = c.req.query("task");
if (!task) {
return c.json({ error: "task es requerido" }, 400);
}
return streamSSE(c, async (stream) => {
try {
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool", "GrepTool"],
maxTurns: 15,
model: "claude-sonnet-4-5",
};
for await (const message of query({ prompt: task, options })) {
if (message.type === "assistant") {
const content = (message as any).message?.content ?? [];
for (const block of content) {
if (block.type === "text" && block.text) {
await stream.writeSSE({
data: JSON.stringify({ type: "text", content: block.text }),
});
}
}
} else if (message.type === "result") {
await stream.writeSSE({
data: JSON.stringify({ type: "done", result: message.result }),
event: "done",
});
}
}
} catch (error) {
await stream.writeSSE({
data: JSON.stringify({ type: "error", message: (error as Error).message }),
event: "error",
});
}
});
});
app.post("/agent/query", async (c) => {
const { task, model = "claude-haiku-3-5" } = await c.req.json<{
task: string;
model?: string;
}>();
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool"],
maxTurns: 10,
model,
};
let result = "";
for await (const message of query({ prompt: task, options })) {
if (message.type === "result") result = message.result;
}
return c.json({ result });
});
export default app;
Deploy en Cloudflare Workers
En Cloudflare Workers no hay filesystem real. El agente puede procesar datos en memoria pero no puede leer archivos del disco. Casos de uso válidos:
- Analizar código enviado en el request body
- Procesar datos JSON
- Generar código o texto
- Clasificar o transformar contenido
// worker.ts - Para Cloudflare Workers
import { Hono } from "hono";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const app = new Hono<{ Bindings: { ANTHROPIC_API_KEY: string } }>();
app.post("/analyze", async (c) => {
const { code, question } = await c.req.json<{
code: string;
question: string;
}>();
// En edge: los datos vienen en el request, no del filesystem
const options: ClaudeCodeOptions = {
allowedTools: [], // Sin herramientas de filesystem en edge
maxTurns: 5,
model: "claude-haiku-3-5",
};
const prompt = `
Analiza el siguiente código y responde: ${question}
CÓDIGO:
\`\`\`
${code}
\`\`\`
`;
let result = "";
for await (const message of query({ prompt, options })) {
if (message.type === "result") result = message.result;
}
return c.json({ result });
});
export default app;
Limitaciones en edge y cómo manejarlas
graph TD
A[Request al Worker Edge] -->|"¿Necesita filesystem?"| B{Decisión}
B -->|No - procesa datos en memoria| C[Hono en Edge]
B -->|Sí - necesita leer archivos| D[Redirect a servidor Node.js]
C -->|Resultado directo| E[Response al cliente]
D -->|Proxy a servidor con filesystem| F[API en Node.js con filesystem]
F -->|Resultado| E
6. Next.js (TypeScript)
Next.js 14+ con App Router ofrece soporte nativo para streaming y Server Actions que se integran perfectamente con el SDK.
API Routes con streaming response
// app/api/agent/route.ts
import { NextRequest, NextResponse } from "next/server";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
export const runtime = "nodejs"; // Importante: no edge, necesita Node.js
export async function POST(req: NextRequest): Promise<Response> {
const { task } = await req.json();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
try {
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool", "GrepTool"],
maxTurns: 15,
model: "claude-sonnet-4-5",
};
for await (const message of query({ prompt: task, options })) {
if (message.type === "assistant") {
const content = (message as any).message?.content ?? [];
for (const block of content) {
if (block.type === "text" && block.text) {
const event = `data: ${JSON.stringify({
type: "text",
content: block.text,
})}\n\n`;
controller.enqueue(encoder.encode(event));
}
}
} else if (message.type === "result") {
const event = `data: ${JSON.stringify({
type: "done",
result: message.result,
})}\n\n`;
controller.enqueue(encoder.encode(event));
}
}
} catch (error) {
const event = `data: ${JSON.stringify({
type: "error",
message: (error as Error).message,
})}\n\n`;
controller.enqueue(encoder.encode(event));
} finally {
controller.close();
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
Server Actions con agente
// app/actions/agent.ts
"use server";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
export async function runAgentAction(
task: string
): Promise<{ result?: string; error?: string }> {
"use server";
try {
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool"],
maxTurns: 10,
model: "claude-haiku-3-5",
};
let result = "";
for await (const message of query({ prompt: task, options })) {
if (message.type === "result") {
result = message.result;
}
}
return { result };
} catch (error) {
return { error: (error as Error).message };
}
}
Componente React con streaming
// app/components/AgentChat.tsx
"use client";
import { useState, useRef } from "react";
interface Message {
role: "user" | "agent";
content: string;
}
export function AgentChat() {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState("");
const [isLoading, setIsLoading] = useState(false);
const [streamingContent, setStreamingContent] = useState("");
const abortRef = useRef<AbortController | null>(null);
const sendMessage = async () => {
if (!input.trim() || isLoading) return;
const userMessage = input.trim();
setInput("");
setIsLoading(true);
setStreamingContent("");
setMessages((prev) => [...prev, { role: "user", content: userMessage }]);
abortRef.current = new AbortController();
try {
const response = await fetch("/api/agent", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ task: userMessage }),
signal: abortRef.current.signal,
});
if (!response.body) throw new Error("No response body");
const reader = response.body.getReader();
const decoder = new TextDecoder();
let accumulated = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = JSON.parse(line.slice(6));
if (data.type === "text") {
accumulated += data.content;
setStreamingContent(accumulated);
} else if (data.type === "done") {
setMessages((prev) => [
...prev,
{ role: "agent", content: data.result },
]);
setStreamingContent("");
}
}
}
} catch (error) {
if ((error as Error).name !== "AbortError") {
setMessages((prev) => [
...prev,
{ role: "agent", content: `Error: ${(error as Error).message}` },
]);
}
} finally {
setIsLoading(false);
setStreamingContent("");
}
};
return (
<div className="flex flex-col h-screen max-w-2xl mx-auto p-4">
<div className="flex-1 overflow-y-auto space-y-4 mb-4">
{messages.map((msg, i) => (
<div
key={i}
className={`p-3 rounded-lg ${
msg.role === "user"
? "bg-blue-100 ml-12"
: "bg-gray-100 mr-12"
}`}
>
<span className="font-bold text-xs text-gray-500">
{msg.role === "user" ? "Tú" : "Agente"}
</span>
<p className="mt-1 whitespace-pre-wrap">{msg.content}</p>
</div>
))}
{streamingContent && (
<div className="bg-gray-100 mr-12 p-3 rounded-lg">
<span className="font-bold text-xs text-gray-500">Agente</span>
<p className="mt-1 whitespace-pre-wrap">{streamingContent}</p>
<span className="animate-pulse">▋</span>
</div>
)}
</div>
<div className="flex gap-2">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyDown={(e) => e.key === "Enter" && !e.shiftKey && sendMessage()}
placeholder="Escribe tu tarea..."
className="flex-1 border rounded-lg px-3 py-2"
disabled={isLoading}
/>
<button
onClick={sendMessage}
disabled={isLoading || !input.trim()}
className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"
>
{isLoading ? "..." : "Enviar"}
</button>
{isLoading && (
<button
onClick={() => abortRef.current?.abort()}
className="bg-red-500 text-white px-4 py-2 rounded-lg"
>
Cancelar
</button>
)}
</div>
</div>
);
}
App Router vs Pages Router
| Característica | App Router (Next.js 13+) | Pages Router (legacy) |
|---|---|---|
| Streaming | Nativo con ReadableStream | Requiere custom response |
| Server Actions | Nativo | No disponible |
| Runtime | Edge o Node.js por archivo | Global |
| Recomendación | Usar para proyectos nuevos | Mantener en proyectos existentes |
7. Queue-based Integration
Para tareas que pueden tardar minutos, la integración via queue es la más robusta.
Arquitectura con BullMQ (Node.js)
sequenceDiagram
participant W as Web App
participant API as API Server
participant Q as Redis Queue
participant WK as Worker
participant SDK as Claude SDK
participant WH as Webhook
W->>API: POST /agent/jobs {task}
API->>Q: bull.add(job)
API-->>W: 201 {job_id}
loop Polling opcional
W->>API: GET /agent/jobs/{job_id}
API-->>W: {status: "pending"}
end
Q->>WK: Consume job
WK->>SDK: query(task)
SDK-->>WK: Resultado
WK->>API: Actualizar job status
WK->>WH: POST resultado al webhook
WH-->>W: Notificación de completado
Implementación con BullMQ
// queue/worker.ts
import { Worker, Queue, Job } from "bullmq";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import axios from "axios";
interface AgentJobData {
jobId: string;
task: string;
model: string;
webhookUrl?: string;
userId: string;
}
// Conexión a Redis
const connection = { host: "localhost", port: 6379 };
// Cola de jobs
export const agentQueue = new Queue<AgentJobData>("agent-jobs", { connection });
// Worker que procesa los jobs
const worker = new Worker<AgentJobData>(
"agent-jobs",
async (job: Job<AgentJobData>) => {
const { task, model, webhookUrl, userId, jobId } = job.data;
console.log(`[WORKER] Procesando job ${jobId} para usuario ${userId}`);
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool", "GrepTool", "Edit"],
maxTurns: 30,
model,
};
// Reportar progreso al job
await job.updateProgress(10);
let result = "";
let turns = 0;
for await (const message of query({ prompt: task, options })) {
if (message.type === "result") {
result = message.result;
}
// Actualizar progreso según turns
turns++;
const progress = Math.min(10 + (turns * 3), 90);
await job.updateProgress(progress);
}
await job.updateProgress(100);
// Notificar via webhook
if (webhookUrl) {
try {
await axios.post(webhookUrl, {
jobId,
status: "completed",
result,
userId,
});
} catch (webhookError) {
console.error(`[WORKER] Error en webhook: ${webhookError}`);
// No fallar el job por error de webhook
}
}
return { result, turns };
},
{
connection,
concurrency: 5, // Máximo 5 jobs en paralelo
limiter: {
max: 10, // Máximo 10 jobs por...
duration: 60000, // ...por minuto (rate limit)
},
}
);
worker.on("completed", (job) => {
console.log(`[WORKER] Job ${job.id} completado`);
});
worker.on("failed", (job, error) => {
console.error(`[WORKER] Job ${job?.id} fallido: ${error.message}`);
});
API que encola los jobs
// api/jobs.ts
import { Router, Request, Response } from "express";
import { agentQueue } from "../queue/worker";
import { v4 as uuidv4 } from "uuid";
const router = Router();
const jobStatuses: Map<string, object> = new Map();
router.post("/jobs", async (req: Request, res: Response) => {
const { task, model = "claude-sonnet-4-5", webhookUrl } = req.body;
const userId = (req as any).user?.sub ?? "anonymous";
const jobId = uuidv4();
const job = await agentQueue.add("agent-task", {
jobId,
task,
model,
webhookUrl,
userId,
}, {
attempts: 3, // Reintentar hasta 3 veces si falla
backoff: {
type: "exponential",
delay: 5000, // Espera exponencial entre reintentos
},
removeOnComplete: 100, // Mantener solo los últimos 100 jobs completados
removeOnFail: 50,
});
res.status(201).json({
jobId,
queueJobId: job.id,
status: "queued",
estimatedWaitMs: await agentQueue.getWaitingCount() * 30000,
});
});
router.get("/jobs/:jobId", async (req: Request, res: Response) => {
const { jobId } = req.params;
const jobs = await agentQueue.getJobs(["waiting", "active", "completed", "failed"]);
const job = jobs.find((j) => j.data.jobId === jobId);
if (!job) {
return res.status(404).json({ error: "Job no encontrado" });
}
const state = await job.getState();
const progress = job.progress;
res.json({
jobId,
status: state,
progress,
result: state === "completed" ? job.returnvalue?.result : undefined,
error: state === "failed" ? job.failedReason : undefined,
createdAt: new Date(job.timestamp).toISOString(),
});
});
export { router as jobsRouter };
Celery (Python) + Redis/RabbitMQ
# tasks.py
from celery import Celery
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
import httpx
celery_app = Celery(
"agent_tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1"
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
result_expires=3600,
task_track_started=True,
worker_concurrency=5,
)
@celery_app.task(bind=True, max_retries=3)
def run_agent_task(self, task: str, model: str, webhook_url: str | None, user_id: str):
"""Tarea Celery que ejecuta el agente."""
self.update_state(state="RUNNING", meta={"progress": 0})
async def _run():
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
max_turns=30,
model=model,
)
result = ""
async for message in query(prompt=task, options=options):
if hasattr(message, 'result'):
result = message.result
return result
try:
loop = asyncio.new_event_loop()
result = loop.run_until_complete(_run())
loop.close()
# Notificar via webhook
if webhook_url:
loop = asyncio.new_event_loop()
async def _webhook():
async with httpx.AsyncClient() as client:
await client.post(webhook_url, json={
"task_id": self.request.id,
"status": "completed",
"result": result,
"user_id": user_id,
})
loop.run_until_complete(_webhook())
loop.close()
return {"status": "completed", "result": result}
except Exception as exc:
self.retry(exc=exc, countdown=2 ** self.request.retries * 5)
# API Flask que encola tareas
@app.route("/agent/jobs", methods=["POST"])
def create_celery_job():
data = request.get_json()
task = run_agent_task.delay(
task=data["task"],
model=data.get("model", "claude-sonnet-4-5"),
webhook_url=data.get("webhook_url"),
user_id=data.get("user_id", "anonymous"),
)
return jsonify({"task_id": task.id, "status": "queued"}), 201
@app.route("/agent/jobs/<task_id>")
def get_celery_job(task_id: str):
result = celery_app.AsyncResult(task_id)
return jsonify({
"task_id": task_id,
"status": result.state,
"result": result.result if result.ready() else None,
})
8. GraphQL
GraphQL ofrece una API más expresiva para sistemas complejos donde los clientes necesitan control sobre qué datos reciben.
Resolver que llama al agente (strawberry - Python)
# graphql_schema.py
import strawberry
from strawberry.types import Info
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
@strawberry.type
class AgentResult:
result: str
cost_usd: float
turns: int
@strawberry.type
class AgentJob:
job_id: str
status: str
result: str | None = None
error: str | None = None
@strawberry.type
class Query:
@strawberry.field
async def run_agent(self, task: str, model: str = "claude-haiku-3-5") -> AgentResult:
"""Resolver síncrono que ejecuta el agente."""
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool"],
max_turns=10,
model=model,
)
result = ""
total_cost = 0.0
total_turns = 0
async for message in query(prompt=task, options=options):
if hasattr(message, 'cost_usd') and message.cost_usd:
total_cost += message.cost_usd
if hasattr(message, 'num_turns'):
total_turns = message.num_turns
if hasattr(message, 'result'):
result = message.result
return AgentResult(result=result, cost_usd=total_cost, turns=total_turns)
@strawberry.type
class Mutation:
@strawberry.mutation
async def create_agent_job(
self,
task: str,
model: str = "claude-sonnet-4-5",
webhook_url: str | None = None
) -> AgentJob:
"""Crear un job async."""
import uuid
job_id = str(uuid.uuid4())
# Lanzar en background
asyncio.create_task(
_run_job_background(job_id, task, model, webhook_url)
)
return AgentJob(job_id=job_id, status="pending")
async def _run_job_background(job_id: str, task: str, model: str, webhook_url: str | None):
"""Ejecuta el job en background."""
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
max_turns=25,
model=model,
)
result = ""
async for message in query(prompt=task, options=options):
if hasattr(message, 'result'):
result = message.result
# Actualizar estado del job en DB...
schema = strawberry.Schema(query=Query, mutation=Mutation)
# Integración con FastAPI
from strawberry.fastapi import GraphQLRouter
graphql_app = GraphQLRouter(schema)
app.include_router(graphql_app, prefix="/graphql")
GraphQL con graphql-yoga (TypeScript)
// graphql/schema.ts
import { createSchema, createYoga } from "graphql-yoga";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const schema = createSchema({
typeDefs: /* GraphQL */ `
type AgentResult {
result: String!
turns: Int!
}
type AgentJob {
jobId: String!
status: String!
result: String
}
type Query {
runAgent(task: String!, model: String): AgentResult!
}
type Mutation {
createAgentJob(task: String!, model: String, webhookUrl: String): AgentJob!
}
type Subscription {
agentStream(task: String!): AgentEvent!
}
type AgentEvent {
type: String!
content: String
result: String
}
`,
resolvers: {
Query: {
runAgent: async (_: unknown, { task, model = "claude-haiku-3-5" }: { task: string; model?: string }) => {
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool"],
maxTurns: 10,
model,
};
let result = "";
let turns = 0;
for await (const message of query({ prompt: task, options })) {
if (message.type === "result") {
result = message.result;
}
}
return { result, turns };
},
},
Subscription: {
agentStream: {
subscribe: async function* (_: unknown, { task }: { task: string }) {
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool"],
maxTurns: 15,
};
for await (const message of query({ prompt: task, options })) {
if (message.type === "assistant") {
const content = (message as any).message?.content ?? [];
for (const block of content) {
if (block.type === "text" && block.text) {
yield {
agentStream: {
type: "text",
content: block.text,
},
};
}
}
} else if (message.type === "result") {
yield {
agentStream: {
type: "done",
result: message.result,
},
};
}
}
},
},
},
},
});
export const yoga = createYoga({ schema });
Cliente GraphQL para consumir subscriptions
// client/agent-client.ts
import { createClient } from "graphql-ws";
import WebSocket from "ws";
const client = createClient({
url: "ws://localhost:4000/graphql",
webSocketImpl: WebSocket,
});
async function* streamAgentViaGraphQL(task: string) {
const subscription = client.iterate<{
agentStream: { type: string; content?: string; result?: string };
}>({
query: /* GraphQL */ `
subscription AgentStream($task: String!) {
agentStream(task: $task) {
type
content
result
}
}
`,
variables: { task },
});
for await (const event of subscription) {
if (event.data) {
yield event.data.agentStream;
}
}
}
// Uso
async function main() {
const stream = streamAgentViaGraphQL("Analiza mi código Python");
for await (const event of stream) {
if (event.type === "text" && event.content) {
process.stdout.write(event.content);
} else if (event.type === "done") {
console.log("\n\n=== Completado ===");
}
}
}
main();
Resumen: Elegir el patrón correcto
flowchart TD
A[¿Cuánto tarda el agente?] -->|"Menos de 10s"| B[Endpoint síncrono]
A -->|"10s a 5 minutos"| C[SSE o WebSocket]
A -->|"Más de 5 minutos"| D[Queue con worker]
B -->|"¿Necesitas streaming?"| E{Streaming}
E -->|No| F[POST → JSON response]
E -->|Sí| G[GET → SSE]
C -->|"¿Bidireccional?"| H{Bidireccional}
H -->|No| I[SSE: cliente ← servidor]
H -->|Sí| J[WebSocket: cliente ↔ servidor]
D -->|"¿Necesitas notificación?"| K{Notificación}
K -->|Polling| L[GET /jobs/id]
K -->|Push| M[Webhook al completar]
| Patrón | Framework | Caso de uso |
|---|---|---|
| Síncrono | FastAPI, Flask, Express, Hono | Tareas cortas (< 10s) |
| SSE | FastAPI, Express, Next.js | Tareas largas con progreso |
| WebSocket | Express + ws | Conversación bidireccional |
| Background Task | FastAPI BackgroundTasks | Tareas de hasta 5 min |
| Queue (BullMQ) | Express + Redis | Tareas largas, alta concurrencia |
| Queue (Celery) | Flask/FastAPI + Redis | Tareas largas en Python |
| GraphQL Subscription | graphql-yoga | Cuando ya tienes GraphQL |
Con estos patrones tienes todas las herramientas para integrar el Claude Agent SDK en cualquier arquitectura de aplicación web moderna. El capítulo siguiente cubre patrones avanzados de multi-agente y orquestación.