Capítulo 11: Streaming y Eventos en Tiempo Real
Capítulo 11: Streaming y Eventos en Tiempo Real
El streaming transforma la experiencia del usuario con agentes de IA. En vez de esperar 30 segundos para ver un resultado, el usuario ve el progreso inmediatamente. Este capítulo cubre todo lo necesario para implementar streaming de producción.
1. ¿Por qué Streaming?
Latencia percibida vs latencia real
Hay una diferencia fundamental entre latencia real y latencia percibida. La latencia real es el tiempo desde que el usuario envía una petición hasta que el sistema tiene la respuesta completa. La latencia percibida es cuánto tiempo siente el usuario que espera.
El streaming no reduce la latencia real. El agente tarda lo mismo en completar su trabajo. Lo que cambia es la latencia percibida: el usuario empieza a ver resultados en milisegundos, lo que transforma una espera ansiosa en una experiencia interactiva.
Estudios de UX muestran que los usuarios toleran hasta 10 segundos de espera si ven progreso visible, pero abandonan después de 3 segundos si no hay feedback. El streaming resuelve este problema de raíz.
Sin streaming vs con streaming
sequenceDiagram
participant U as Usuario
participant A as Agente
participant C as Claude API
rect rgb(255, 220, 220)
Note over U,C: Sin Streaming
U->>A: "Refactoriza mi código"
A->>C: Petición completa
Note over C: Procesando... 30s
C->>A: Respuesta completa
A->>U: Resultado (30s después)
Note over U: ¿Está funcionando?
end
rect rgb(220, 255, 220)
Note over U,C: Con Streaming
U->>A: "Refactoriza mi código"
A->>C: Petición con stream=true
C-->>A: MessageStart
C-->>A: TextDelta "Analizando..."
C-->>A: ToolUse "Read archivo.py"
A-->>U: "Leyendo archivo..."
C-->>A: TextDelta "Encontré 3 problemas..."
A-->>U: "Encontré 3 problemas..."
C-->>A: ToolUse "Write archivo.py"
A-->>U: "Aplicando cambios..."
C-->>A: ResultMessage
A->>U: "¡Listo!" (feedback continuo)
end
La diferencia visual es dramática. Con streaming, el usuario sabe exactamente qué está haciendo el agente en cada momento.
El async generator del SDK ya es streaming nativo
Una de las grandes ventajas del Claude Code SDK es que ya implementa streaming internamente. La función query() retorna un async generator que emite mensajes a medida que llegan del proceso de Claude Code, no cuando termina.
from claude_code_sdk import query, ClaudeCodeOptions
async def main():
# query() retorna AsyncGenerator[Message, None]
# Cada iteración retorna cuando hay un nuevo mensaje disponible
# NO espera a que todo termine
async for message in query(
prompt="Analiza este proyecto",
options=ClaudeCodeOptions(cwd="/mi/proyecto")
):
# Este código corre tan pronto como llega un mensaje
print(f"Nuevo mensaje: {type(message).__name__}")
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
async function main() {
// query() retorna AsyncGenerator<SDKMessage>
// Cada yield ocurre cuando llega un nuevo mensaje
for await (const message of query({
prompt: "Analiza este proyecto",
options: { cwd: "/mi/proyecto" } as ClaudeCodeOptions,
})) {
// Este código corre tan pronto como llega un mensaje
console.log(`Nuevo mensaje: ${message.type}`);
}
}
Tipos de mensajes y su orden en el tiempo
El SDK emite mensajes en un orden predecible. Entender este orden es clave para implementar streaming efectivo:
sequenceDiagram
participant SDK as SDK Generator
participant App as Tu Aplicación
SDK-->>App: SystemMessage (configuración inicial)
Note over App: Mostrar "Iniciando agente..."
SDK-->>App: AssistantMessage (texto + herramientas)
Note over App: Mostrar texto parcial
SDK-->>App: AssistantMessage (tool_use: Bash)
Note over App: Mostrar "Ejecutando comando..."
SDK-->>App: AssistantMessage (más texto)
Note over App: Actualizar UI
SDK-->>App: ResultMessage (resumen final)
Note over App: Mostrar resultado final, costo
Los tipos principales son:
- SystemMessage: Metadatos del sistema, aparece primero
- AssistantMessage: Lo que dice Claude, puede contener texto o llamadas a herramientas
- ResultMessage: El resultado final con estadísticas de uso
Cada AssistantMessage puede contener múltiples content blocks, y cada block puede ser TextBlock (texto del asistente) o ToolUseBlock (llamada a una herramienta).
2. AssistantMessage Streaming
Cómo extraer texto en tiempo real
El AssistantMessage contiene una lista de content blocks. Para streaming de texto, necesitas iterar estos blocks y mostrarlos a medida que llegan.
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
from claude_code_sdk.types import TextBlock, ToolUseBlock
async def stream_text(prompt: str, cwd: str):
"""Extrae y muestra texto en tiempo real."""
full_text = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if not isinstance(message, AssistantMessage):
continue
for block in message.content:
if isinstance(block, TextBlock):
# Texto del asistente - mostrar inmediatamente
print(block.text, end="", flush=True)
full_text.append(block.text)
elif isinstance(block, ToolUseBlock):
# El agente está usando una herramienta
print(f"\n[Usando: {block.name}]", flush=True)
print() # Nueva línea al final
return "".join(full_text)
import { query, ClaudeCodeOptions, AssistantMessage } from "@anthropic-ai/claude-code-sdk";
async function streamText(prompt: string, cwd: string): Promise<string> {
const fullText: string[] = [];
for await (const message of query({
prompt,
options: { cwd } as ClaudeCodeOptions,
})) {
if (message.type !== "assistant") continue;
const assistantMessage = message as AssistantMessage;
for (const block of assistantMessage.message.content) {
if (block.type === "text") {
process.stdout.write(block.text);
fullText.push(block.text);
} else if (block.type === "tool_use") {
process.stdout.write(`\n[Usando: ${block.name}]\n`);
}
}
}
process.stdout.write("\n");
return fullText.join("");
}
Content blocks: text vs tool_use
Un AssistantMessage puede contener bloques mezclados. Es normal que Claude alterne entre texto y llamadas a herramientas:
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
from claude_code_sdk.types import TextBlock, ToolUseBlock
from dataclasses import dataclass
from typing import Union
@dataclass
class StreamEvent:
kind: str # "text" | "tool_start" | "tool_input"
data: Union[str, dict]
async def stream_with_events(prompt: str, cwd: str):
"""Stream con eventos estructurados."""
events = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if not isinstance(message, AssistantMessage):
continue
for block in message.content:
if isinstance(block, TextBlock):
event = StreamEvent(kind="text", data=block.text)
events.append(event)
yield event
elif isinstance(block, ToolUseBlock):
# Inicio de herramienta
start_event = StreamEvent(
kind="tool_start",
data={"name": block.name, "id": block.id}
)
events.append(start_event)
yield start_event
# Input de la herramienta
input_event = StreamEvent(
kind="tool_input",
data=block.input
)
events.append(input_event)
yield input_event
# Usar el generator
async def main():
async for event in stream_with_events("Refactoriza app.py", "/proyecto"):
if event.kind == "text":
print(event.data, end="", flush=True)
elif event.kind == "tool_start":
print(f"\n→ Herramienta: {event.data['name']}")
elif event.kind == "tool_input":
print(f" Input: {event.data}")
Mostrar progreso mientras el agente piensa
Entre mensajes puede haber pausas mientras Claude procesa. Un spinner o indicador de actividad mejora la UX:
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
import sys
async def stream_with_thinking_indicator(prompt: str, cwd: str):
"""Muestra indicador mientras el agente procesa."""
thinking = False
spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
spinner_idx = 0
async def show_spinner():
nonlocal spinner_idx
while thinking:
char = spinner_chars[spinner_idx % len(spinner_chars)]
sys.stdout.write(f"\r{char} Pensando...")
sys.stdout.flush()
spinner_idx += 1
await asyncio.sleep(0.1)
spinner_task = None
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
# Parar spinner cuando llega mensaje
thinking = False
if spinner_task and not spinner_task.done():
spinner_task.cancel()
sys.stdout.write("\r" + " " * 20 + "\r") # Limpiar línea
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
print(block.text, end="", flush=True)
elif hasattr(block, "name"):
print(f"\n[{block.name}]", end=" ", flush=True)
elif isinstance(message, ResultMessage):
print(f"\n✓ Completado en {message.duration_ms}ms")
break
# Iniciar spinner para el siguiente mensaje
thinking = True
spinner_task = asyncio.create_task(show_spinner())
Buffer de texto parcial
En algunos casos necesitas acumular texto antes de procesarlo (por ejemplo, para buscar patrones completos):
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
import re
class TextBuffer:
"""Buffer para acumular y procesar texto parcial."""
def __init__(self, flush_on: str = "\n"):
self.buffer = ""
self.flush_on = flush_on
self.lines = []
def add(self, text: str) -> list[str]:
"""Agrega texto y retorna líneas completas."""
self.buffer += text
complete_lines = []
while self.flush_on in self.buffer:
idx = self.buffer.index(self.flush_on)
line = self.buffer[:idx]
complete_lines.append(line)
self.lines.append(line)
self.buffer = self.buffer[idx + len(self.flush_on):]
return complete_lines
def flush(self) -> str:
"""Vacía el buffer restante."""
remaining = self.buffer
self.buffer = ""
return remaining
async def stream_with_buffer(prompt: str, cwd: str):
buf = TextBuffer(flush_on="\n")
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
complete_lines = buf.add(block.text)
for line in complete_lines:
# Procesar línea completa (por ejemplo, colorear código)
if line.startswith("```"):
print(f"\033[33m{line}\033[0m") # Amarillo
elif line.startswith("#"):
print(f"\033[1m{line}\033[0m") # Negrita
else:
print(line)
# Procesar texto restante
remaining = buf.flush()
if remaining:
print(remaining)
import { query, ClaudeCodeOptions, AssistantMessage } from "@anthropic-ai/claude-code-sdk";
class TextBuffer {
private buffer = "";
private lines: string[] = [];
constructor(private readonly flushOn: string = "\n") {}
add(text: string): string[] {
this.buffer += text;
const completeLines: string[] = [];
while (this.buffer.includes(this.flushOn)) {
const idx = this.buffer.indexOf(this.flushOn);
const line = this.buffer.slice(0, idx);
completeLines.push(line);
this.lines.push(line);
this.buffer = this.buffer.slice(idx + this.flushOn.length);
}
return completeLines;
}
flush(): string {
const remaining = this.buffer;
this.buffer = "";
return remaining;
}
}
async function streamWithBuffer(prompt: string, cwd: string) {
const buf = new TextBuffer("\n");
for await (const message of query({
prompt,
options: { cwd } as ClaudeCodeOptions,
})) {
if (message.type !== "assistant") continue;
const assistantMessage = message as AssistantMessage;
for (const block of assistantMessage.message.content) {
if (block.type === "text") {
const lines = buf.add(block.text);
for (const line of lines) {
console.log(line);
}
}
}
}
const remaining = buf.flush();
if (remaining) console.log(remaining);
}
3. Terminal Output en Tiempo Real
Rich library para output bonito (Python)
La librería rich de Python transforma la salida de terminal con colores, tablas, progress bars y paneles. Es perfecta para agentes interactivos:
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.spinner import Spinner
from rich.text import Text
from rich.table import Table
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
console = Console()
async def rich_streaming_agent(prompt: str, cwd: str):
"""Agente con output rico en terminal."""
tool_usage = []
current_text = Text()
with Live(
Panel(Spinner("dots", text="Iniciando agente..."), title="Claude Agent"),
console=console,
refresh_per_second=10
) as live:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
current_text.append(block.text)
live.update(Panel(
current_text,
title="[bold green]Claude Agent[/]",
border_style="green"
))
elif hasattr(block, "name"):
tool_name = block.name
tool_usage.append(tool_name)
# Mostrar herramienta en uso
tools_table = Table(show_header=False, box=None)
for tool in tool_usage[-5:]: # Últimas 5
tools_table.add_row(f"→ [cyan]{tool}[/]")
from rich.columns import Columns
live.update(Panel(
Columns([current_text, tools_table]),
title="[bold green]Claude Agent[/]"
))
elif isinstance(message, ResultMessage):
# Mostrar resumen final
summary = Table(title="Resumen", show_header=True)
summary.add_column("Métrica")
summary.add_column("Valor", style="cyan")
summary.add_row("Duración", f"{message.duration_ms}ms")
summary.add_row("Herramientas", str(len(tool_usage)))
summary.add_row("Herramientas usadas", ", ".join(set(tool_usage)))
live.update(Panel(summary, title="[bold blue]Completado[/]"))
return message
Spinner mientras el agente trabaja
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
console = Console()
async def agent_with_spinner(prompt: str, cwd: str):
"""Agente con spinner elegante."""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
TimeElapsedColumn(),
console=console,
transient=True # Se borra al terminar
) as progress:
task = progress.add_task("Iniciando agente...", total=None)
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "name"):
progress.update(
task,
description=f"Usando [cyan]{block.name}[/]..."
)
elif hasattr(block, "text") and block.text.strip():
# Mostrar primeras palabras del texto
preview = block.text[:50].replace("\n", " ")
progress.update(task, description=f"Pensando: {preview}...")
console.print("[green]✓ Agente completado[/green]")
TypeScript equivalent con chalk/ora
import { query, ClaudeCodeOptions, AssistantMessage, ResultMessage } from "@anthropic-ai/claude-code-sdk";
import ora, { Ora } from "ora";
import chalk from "chalk";
async function richStreamingAgent(prompt: string, cwd: string) {
const spinner: Ora = ora("Iniciando agente...").start();
const toolsUsed: string[] = [];
const outputLines: string[] = [];
try {
for await (const message of query({
prompt,
options: { cwd } as ClaudeCodeOptions,
})) {
if (message.type === "assistant") {
const assistantMessage = message as AssistantMessage;
for (const block of assistantMessage.message.content) {
if (block.type === "text" && block.text.trim()) {
spinner.stop();
const lines = block.text.split("\n");
for (const line of lines) {
if (line.startsWith("# ")) {
console.log(chalk.bold.blue(line));
} else if (line.startsWith("```")) {
console.log(chalk.yellow(line));
} else {
console.log(line);
}
outputLines.push(line);
}
spinner.start("Procesando...");
} else if (block.type === "tool_use") {
toolsUsed.push(block.name);
spinner.text = chalk.cyan(`Usando: ${block.name}`);
}
}
} else if (message.type === "result") {
const resultMessage = message as ResultMessage;
spinner.succeed(chalk.green("Agente completado"));
console.log("\n" + chalk.bold("=== Resumen ==="));
console.log(`Duración: ${chalk.cyan(resultMessage.duration_ms + "ms")}`);
console.log(`Herramientas: ${chalk.cyan(toolsUsed.join(", ") || "ninguna")}`);
}
}
} catch (error) {
spinner.fail(chalk.red(`Error: ${error}`));
throw error;
}
}
4. Server-Sent Events (SSE)
¿Qué son los SSE?
Server-Sent Events es un protocolo HTTP estándar para streaming unidireccional del servidor al cliente. Es ideal para agentes porque:
- Usa HTTP estándar (no necesita WebSocket)
- Reconexión automática del navegador
- Soporte nativo en todos los navegadores modernos
- Más simple que WebSockets para streaming unidireccional
sequenceDiagram
participant B as Navegador
participant S as Servidor SSE
participant A as Claude Agent
B->>S: GET /api/agent/run (Accept: text/event-stream)
S->>A: Iniciar agente
loop Streaming
A-->>S: Nuevo mensaje
S-->>B: data: {"type":"text","content":"..."}\n\n
B->>B: Actualizar UI
end
S-->>B: data: {"type":"done"}\n\n
B->>B: Mostrar resultado final
Exponer el agente como endpoint SSE con FastAPI
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
import json
import asyncio
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
async def agent_stream_generator(prompt: str, cwd: str):
"""Generator que emite eventos SSE."""
# Evento de inicio
yield f"data: {json.dumps({'type': 'start', 'prompt': prompt})}\n\n"
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
event = {
"type": "text",
"content": block.text
}
yield f"data: {json.dumps(event)}\n\n"
elif hasattr(block, "name"):
event = {
"type": "tool_use",
"tool": block.name,
"input": block.input if hasattr(block, "input") else {}
}
yield f"data: {json.dumps(event)}\n\n"
elif isinstance(message, ResultMessage):
event = {
"type": "result",
"subtype": message.subtype,
"duration_ms": message.duration_ms,
"cost_usd": message.cost_usd,
}
yield f"data: {json.dumps(event)}\n\n"
except Exception as e:
error_event = {"type": "error", "message": str(e)}
yield f"data: {json.dumps(error_event)}\n\n"
finally:
yield f"data: {json.dumps({'type': 'done'})}\n\n"
@app.post("/api/agent/run")
async def run_agent(request: Request):
body = await request.json()
prompt = body.get("prompt", "")
cwd = body.get("cwd", "/tmp")
return StreamingResponse(
agent_stream_generator(prompt, cwd),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Para nginx
}
)
# Ejecutar: uvicorn main:app --reload
Exponer como SSE con Express/Hono
import { query, ClaudeCodeOptions, AssistantMessage, ResultMessage } from "@anthropic-ai/claude-code-sdk";
import express, { Request, Response } from "express";
import cors from "cors";
const app = express();
app.use(cors());
app.use(express.json());
app.post("/api/agent/run", async (req: Request, res: Response) => {
const { prompt, cwd = "/tmp" } = req.body;
// Headers para SSE
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
const sendEvent = (data: object) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
sendEvent({ type: "start", prompt });
try {
for await (const message of query({
prompt,
options: { cwd } as ClaudeCodeOptions,
})) {
if (message.type === "assistant") {
const assistantMessage = message as AssistantMessage;
for (const block of assistantMessage.message.content) {
if (block.type === "text") {
sendEvent({ type: "text", content: block.text });
} else if (block.type === "tool_use") {
sendEvent({ type: "tool_use", tool: block.name, input: block.input });
}
}
} else if (message.type === "result") {
const resultMessage = message as ResultMessage;
sendEvent({
type: "result",
duration_ms: resultMessage.duration_ms,
});
}
}
} catch (error) {
sendEvent({ type: "error", message: String(error) });
} finally {
sendEvent({ type: "done" });
res.end();
}
});
app.listen(3000, () => console.log("SSE server on port 3000"));
Cliente SSE en el navegador
<!DOCTYPE html>
<html lang="es">
<head>
<meta charset="UTF-8">
<title>Claude Agent en Tiempo Real</title>
<style>
body { font-family: monospace; max-width: 800px; margin: 2rem auto; padding: 1rem; }
#output { background: #1e1e1e; color: #d4d4d4; padding: 1rem; min-height: 300px;
border-radius: 8px; white-space: pre-wrap; overflow-y: auto; }
.tool-use { color: #4ec9b0; }
.error { color: #f44747; }
.done { color: #608b4e; font-weight: bold; }
button { padding: 0.5rem 1rem; margin: 0.5rem; cursor: pointer; }
#prompt { width: 100%; padding: 0.5rem; margin-bottom: 1rem; }
</style>
</head>
<body>
<h1>Claude Agent - Streaming Demo</h1>
<textarea id="prompt" rows="3" placeholder="Escribe tu tarea...">Lista los archivos del directorio actual</textarea>
<br>
<button onclick="runAgent()">Ejecutar</button>
<button onclick="stopAgent()">Detener</button>
<div id="output"></div>
<script>
let eventSource = null;
const output = document.getElementById("output");
function appendToOutput(text, className = "") {
const span = document.createElement("span");
if (className) span.className = className;
span.textContent = text;
output.appendChild(span);
output.scrollTop = output.scrollHeight;
}
async function runAgent() {
if (eventSource) eventSource.close();
output.innerHTML = "";
const prompt = document.getElementById("prompt").value;
// SSE via POST necesita fetch + ReadableStream
const response = await fetch("/api/agent/run", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt, cwd: "/tmp" })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n\n");
buffer = lines.pop() || "";
for (const chunk of lines) {
if (!chunk.startsWith("data: ")) continue;
const jsonStr = chunk.slice(6);
try {
const event = JSON.parse(jsonStr);
handleEvent(event);
} catch (e) {
console.error("Parse error:", e);
}
}
}
}
function handleEvent(event) {
switch (event.type) {
case "start":
appendToOutput(`> ${event.prompt}\n\n`);
break;
case "text":
appendToOutput(event.content);
break;
case "tool_use":
appendToOutput(`\n[${event.tool}]\n`, "tool-use");
break;
case "error":
appendToOutput(`\nERROR: ${event.message}\n`, "error");
break;
case "done":
appendToOutput("\n\n--- Completado ---\n", "done");
break;
}
}
function stopAgent() {
if (eventSource) {
eventSource.close();
appendToOutput("\n[Detenido por usuario]\n", "error");
}
}
</script>
</body>
</html>
Reconexión automática en SSE
Para SSE nativo (GET), el navegador reconecta automáticamente. Para POST-based SSE, necesitas implementarlo manualmente:
class ResilientAgentStream {
constructor(prompt, cwd, onEvent) {
this.prompt = prompt;
this.cwd = cwd;
this.onEvent = onEvent;
this.retryCount = 0;
this.maxRetries = 3;
this.retryDelay = 1000;
this.abortController = null;
}
async start() {
while (this.retryCount <= this.maxRetries) {
try {
await this.connect();
break; // Éxito, salir del loop
} catch (error) {
this.retryCount++;
if (this.retryCount > this.maxRetries) {
this.onEvent({ type: "error", message: "Max retries reached" });
break;
}
const delay = this.retryDelay * Math.pow(2, this.retryCount - 1);
this.onEvent({ type: "reconnecting", attempt: this.retryCount, delay });
await new Promise(r => setTimeout(r, delay));
}
}
}
async connect() {
this.abortController = new AbortController();
const response = await fetch("/api/agent/run", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt: this.prompt, cwd: this.cwd }),
signal: this.abortController.signal
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Procesar eventos...
}
}
stop() {
if (this.abortController) {
this.abortController.abort();
}
}
}
5. WebSockets
Agente como servidor WebSocket
WebSockets permiten comunicación bidireccional: el usuario puede enviar mensajes mientras el agente está corriendo, o múltiples clientes pueden observar el mismo agente.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
import json
import asyncio
app = FastAPI()
class AgentSession:
"""Sesión de agente con WebSocket."""
def __init__(self, websocket: WebSocket, session_id: str):
self.websocket = websocket
self.session_id = session_id
self.active = True
async def send(self, event: dict):
if self.active:
await self.websocket.send_text(json.dumps(event))
async def run_agent(self, prompt: str, cwd: str):
await self.send({"type": "agent_start", "session_id": self.session_id})
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
await self.send({
"type": "text",
"content": block.text
})
elif hasattr(block, "name"):
await self.send({
"type": "tool_use",
"tool": block.name
})
elif isinstance(message, ResultMessage):
await self.send({
"type": "agent_done",
"duration_ms": message.duration_ms,
"cost_usd": message.cost_usd,
})
except asyncio.CancelledError:
await self.send({"type": "agent_cancelled"})
except Exception as e:
await self.send({"type": "error", "message": str(e)})
sessions: dict[str, AgentSession] = {}
@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
await websocket.accept()
session = AgentSession(websocket, session_id)
sessions[session_id] = session
agent_task = None
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message["action"] == "start_agent":
if agent_task and not agent_task.done():
agent_task.cancel()
agent_task = asyncio.create_task(
session.run_agent(
prompt=message["prompt"],
cwd=message.get("cwd", "/tmp")
)
)
elif message["action"] == "cancel_agent":
if agent_task and not agent_task.done():
agent_task.cancel()
await session.send({"type": "cancelled"})
except WebSocketDisconnect:
if agent_task and not agent_task.done():
agent_task.cancel()
del sessions[session_id]
Múltiples clientes viendo el mismo agente
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
import json
import asyncio
from typing import set as Set
app = FastAPI()
class BroadcastSession:
"""Un agente que puede tener múltiples observadores."""
def __init__(self, session_id: str):
self.session_id = session_id
self.observers: set[WebSocket] = set()
self.agent_task: asyncio.Task | None = None
self.messages: list[dict] = [] # Historial para nuevos observadores
async def add_observer(self, websocket: WebSocket):
"""Agrega un observador y le envía el historial."""
self.observers.add(websocket)
# Enviar historial de mensajes previos
for msg in self.messages:
await websocket.send_text(json.dumps({**msg, "replay": True}))
def remove_observer(self, websocket: WebSocket):
self.observers.discard(websocket)
async def broadcast(self, event: dict):
"""Envía un evento a todos los observadores."""
self.messages.append(event)
dead_connections = set()
for ws in self.observers.copy():
try:
await ws.send_text(json.dumps(event))
except Exception:
dead_connections.add(ws)
self.observers -= dead_connections
async def run_agent(self, prompt: str, cwd: str):
await self.broadcast({"type": "start", "prompt": prompt})
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
await self.broadcast({"type": "text", "content": block.text})
elif hasattr(block, "name"):
await self.broadcast({"type": "tool", "name": block.name})
elif isinstance(message, ResultMessage):
await self.broadcast({"type": "done", "cost": message.cost_usd})
broadcast_sessions: dict[str, BroadcastSession] = {}
@app.websocket("/ws/watch/{session_id}")
async def watch_session(websocket: WebSocket, session_id: str):
"""Observar una sesión existente."""
await websocket.accept()
if session_id not in broadcast_sessions:
await websocket.send_text(json.dumps({"type": "error", "message": "Sesión no encontrada"}))
await websocket.close()
return
session = broadcast_sessions[session_id]
await session.add_observer(websocket)
try:
while True:
await websocket.receive_text() # Mantener viva la conexión
except WebSocketDisconnect:
session.remove_observer(websocket)
TypeScript con ws library
import { query, ClaudeCodeOptions, AssistantMessage, ResultMessage } from "@anthropic-ai/claude-code-sdk";
import { WebSocketServer, WebSocket } from "ws";
import { IncomingMessage } from "http";
interface ClientMessage {
action: "start_agent" | "cancel_agent";
prompt?: string;
cwd?: string;
}
const wss = new WebSocketServer({ port: 8080 });
wss.on("connection", (ws: WebSocket, req: IncomingMessage) => {
console.log("Cliente conectado");
let agentController: AbortController | null = null;
ws.on("message", async (data: Buffer) => {
const message: ClientMessage = JSON.parse(data.toString());
if (message.action === "start_agent" && message.prompt) {
// Cancelar agente anterior si existe
agentController?.abort();
agentController = new AbortController();
const sendEvent = (event: object) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(event));
}
};
sendEvent({ type: "start" });
try {
for await (const msg of query({
prompt: message.prompt,
options: { cwd: message.cwd ?? "/tmp" } as ClaudeCodeOptions,
})) {
if (agentController.signal.aborted) break;
if (msg.type === "assistant") {
const assistantMsg = msg as AssistantMessage;
for (const block of assistantMsg.message.content) {
if (block.type === "text") {
sendEvent({ type: "text", content: block.text });
} else if (block.type === "tool_use") {
sendEvent({ type: "tool_use", tool: block.name });
}
}
} else if (msg.type === "result") {
const resultMsg = msg as ResultMessage;
sendEvent({ type: "done", duration_ms: resultMsg.duration_ms });
}
}
} catch (error) {
if (!agentController.signal.aborted) {
sendEvent({ type: "error", message: String(error) });
}
}
} else if (message.action === "cancel_agent") {
agentController?.abort();
ws.send(JSON.stringify({ type: "cancelled" }));
}
});
ws.on("close", () => {
agentController?.abort();
console.log("Cliente desconectado");
});
});
console.log("WebSocket server en puerto 8080");
6. Streaming a Fichero de Log
Escribir cada evento a un archivo en tiempo real
Para jobs largos en background, es útil hacer log de cada evento inmediatamente:
import json
import asyncio
from pathlib import Path
from datetime import datetime
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
async def agent_with_file_log(prompt: str, cwd: str, log_path: str):
"""Agente que escribe cada evento a un archivo de log."""
log_file = Path(log_path)
log_file.parent.mkdir(parents=True, exist_ok=True)
with open(log_file, "w", buffering=1) as f: # buffering=1 = line buffered
def log(event: dict):
entry = {
"timestamp": datetime.now().isoformat(),
**event
}
f.write(json.dumps(entry) + "\n")
log({"type": "session_start", "prompt": prompt, "cwd": cwd})
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
log({"type": "text", "content": block.text})
elif hasattr(block, "name"):
log({"type": "tool_use", "tool": block.name})
elif isinstance(message, ResultMessage):
log({
"type": "session_end",
"duration_ms": message.duration_ms,
"cost_usd": message.cost_usd,
})
return log_file
# Lanzar en background y seguir el log
async def run_background_agent():
log_path = f"/tmp/agent_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl"
# Lanzar agente en background
task = asyncio.create_task(
agent_with_file_log(
prompt="Analiza y documenta todo el proyecto",
cwd="/mi/proyecto",
log_path=log_path
)
)
print(f"Agente corriendo en background. Log: {log_path}")
print(f"Sigue el log con: tail -f {log_path} | python -m json.tool")
return task
Tail del archivo mientras corre
import asyncio
from pathlib import Path
import json
async def tail_log(log_path: str):
"""Sigue un archivo de log en tiempo real."""
path = Path(log_path)
# Esperar a que el archivo exista
while not path.exists():
await asyncio.sleep(0.1)
with open(path, "r") as f:
# Ir al final
f.seek(0, 2)
while True:
line = f.readline()
if not line:
await asyncio.sleep(0.05)
continue
try:
event = json.loads(line.strip())
event_type = event.get("type", "")
if event_type == "text":
print(event["content"], end="", flush=True)
elif event_type == "tool_use":
print(f"\n[{event['tool']}]", end=" ", flush=True)
elif event_type == "session_end":
print(f"\n\n✓ Completado en {event['duration_ms']}ms")
break
except json.JSONDecodeError:
pass
7. Progress Tracking
Estimar progreso de una tarea larga
El agente no siempre sabe cuánto falta, pero podemos estimar progreso basándonos en heurísticas:
from dataclasses import dataclass, field
from typing import Optional
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
@dataclass
class ProgressTracker:
"""Rastrea el progreso estimado de un agente."""
total_steps: int = 10 # Estimado
current_step: int = 0
tool_calls: list[str] = field(default_factory=list)
start_time: float = field(default_factory=lambda: __import__("time").time())
def on_tool_use(self, tool_name: str):
self.tool_calls.append(tool_name)
# Cada tool use cuenta como progreso
self.current_step = min(self.current_step + 1, self.total_steps - 1)
def on_text(self, text: str):
# Palabras clave que indican progreso
milestones = {
"analizando": 1,
"encontré": 2,
"modificando": 3,
"completado": 4,
"terminado": 4,
}
text_lower = text.lower()
for keyword, bonus in milestones.items():
if keyword in text_lower:
self.current_step = min(self.current_step + bonus, self.total_steps - 1)
@property
def percentage(self) -> float:
return (self.current_step / self.total_steps) * 100
@property
def elapsed_seconds(self) -> float:
return __import__("time").time() - self.start_time
async def agent_with_progress(prompt: str, cwd: str):
from tqdm.asyncio import tqdm
tracker = ProgressTracker(total_steps=20)
with tqdm(total=100, desc="Progreso", unit="%", bar_format="{l_bar}{bar}| {n:.0f}%") as pbar:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
tracker.on_text(block.text)
elif hasattr(block, "name"):
tracker.on_tool_use(block.name)
pbar.set_postfix(tool=block.name)
new_pct = tracker.percentage
delta = new_pct - pbar.n
if delta > 0:
pbar.update(delta)
elif isinstance(message, ResultMessage):
pbar.update(100 - pbar.n) # Completar al 100%
pbar.set_postfix(done=True)
TypeScript con cli-progress
import { query, ClaudeCodeOptions, AssistantMessage } from "@anthropic-ai/claude-code-sdk";
import cliProgress from "cli-progress";
import chalk from "chalk";
async function agentWithProgress(prompt: string, cwd: string) {
const bar = new cliProgress.SingleBar(
{
format: `Claude Agent |${chalk.cyan("{bar}")}| {percentage}% | {tool}`,
barCompleteChar: "\u2588",
barIncompleteChar: "\u2591",
},
cliProgress.Presets.shades_classic
);
bar.start(100, 0, { tool: "iniciando..." });
let progress = 0;
const increment = (amount: number, tool = "") => {
progress = Math.min(progress + amount, 99);
bar.update(progress, { tool });
};
for await (const message of query({
prompt,
options: { cwd } as ClaudeCodeOptions,
})) {
if (message.type === "assistant") {
const assistantMessage = message as AssistantMessage;
for (const block of assistantMessage.message.content) {
if (block.type === "tool_use") {
increment(5, block.name);
} else if (block.type === "text") {
increment(2, "procesando...");
}
}
} else if (message.type === "result") {
bar.update(100, { tool: "completado" });
}
}
bar.stop();
}
Notificaciones de hitos
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
from typing import Callable
async def agent_with_milestones(
prompt: str,
cwd: str,
on_milestone: Callable[[str, dict], None]
):
"""Agente que notifica en hitos clave."""
tool_count = 0
text_accumulator = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "name"):
tool_count += 1
# Hito: primera herramienta
if tool_count == 1:
on_milestone("first_tool", {"tool": block.name})
# Hito: 5 herramientas (tarea compleja)
if tool_count == 5:
on_milestone("complex_task", {"tool_count": tool_count})
elif hasattr(block, "text"):
text_accumulator.append(block.text)
full_text = " ".join(text_accumulator).lower()
if "error" in full_text and "corregido" not in full_text:
on_milestone("error_detected", {"text": block.text})
if any(w in full_text for w in ["completado", "listo", "terminado"]):
on_milestone("near_completion", {})
# Uso
def handle_milestone(name: str, data: dict):
icons = {
"first_tool": "🔧",
"complex_task": "⚙️",
"error_detected": "⚠️",
"near_completion": "✅"
}
print(f"\n{icons.get(name, '📍')} Hito: {name} - {data}")
import asyncio
asyncio.run(agent_with_milestones(
"Refactoriza el proyecto completo",
"/mi/proyecto",
handle_milestone
))
8. Cancelación de Streams
AbortController en TypeScript
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
async function cancellableAgent(
prompt: string,
cwd: string,
timeoutMs: number = 60000
): Promise<void> {
const controller = new AbortController();
// Auto-cancelar después del timeout
const timeoutId = setTimeout(() => {
controller.abort(new Error(`Timeout after ${timeoutMs}ms`));
}, timeoutMs);
try {
for await (const message of query({
prompt,
options: { cwd } as ClaudeCodeOptions,
})) {
// Verificar si fue cancelado
if (controller.signal.aborted) {
console.log("Agente cancelado");
break;
}
// Procesar mensaje...
console.log(message.type);
}
} catch (error) {
if (controller.signal.aborted) {
console.log("Stream cancelado limpiamente");
} else {
throw error;
}
} finally {
clearTimeout(timeoutId);
// Limpiar recursos aquí
}
}
// Cancelación manual desde UI
const controller = new AbortController();
// En un botón "Cancelar":
function onCancelClick() {
controller.abort(new Error("Usuario canceló"));
}
asyncio.CancelledError en Python
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
async def cancellable_agent(prompt: str, cwd: str):
"""Agente que maneja cancelación limpiamente."""
task = asyncio.current_task()
cleanup_done = False
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
# Procesar mensaje
print(f"Mensaje: {type(message).__name__}")
except asyncio.CancelledError:
print("Agente cancelado, limpiando recursos...")
cleanup_done = True
# Aquí: guardar estado, cerrar conexiones, etc.
raise # Importante: re-raise para que asyncio sepa que fue cancelado
finally:
if not cleanup_done:
print("Limpieza normal")
# Siempre ejecutado: cerrar archivos, conexiones, etc.
async def main_with_cancel():
task = asyncio.create_task(
cancellable_agent("Tarea muy larga", "/proyecto")
)
# Cancelar después de 10 segundos
await asyncio.sleep(10)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Tarea cancelada exitosamente")
Graceful shutdown del agente
import asyncio
import signal
from claude_code_sdk import query, ClaudeCodeOptions
class GracefulAgent:
def __init__(self):
self._shutdown = asyncio.Event()
self._current_task: asyncio.Task | None = None
def setup_signal_handlers(self):
"""Capturar Ctrl+C y SIGTERM."""
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self._handle_shutdown)
def _handle_shutdown(self):
print("\nShutdown solicitado...")
self._shutdown.set()
if self._current_task:
self._current_task.cancel()
async def run(self, prompt: str, cwd: str):
self.setup_signal_handlers()
async def run_agent():
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if self._shutdown.is_set():
break
print(f"[{type(message).__name__}]")
self._current_task = asyncio.create_task(run_agent())
try:
await self._current_task
except asyncio.CancelledError:
print("Agente detenido gracefully")
finally:
print("Recursos liberados")
agent = GracefulAgent()
asyncio.run(agent.run("Tarea larga", "/proyecto"))
9. Backpressure y Buffers
Qué pasa si el consumidor es más lento que el productor
El SDK de Claude Code produce mensajes a la velocidad de la API. Si tu código de procesamiento es lento, puedes acumular mensajes en memoria. Hay que gestionar esto:
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
async def agent_with_queue(prompt: str, cwd: str):
"""Usa una cola para manejar backpressure."""
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
async def producer():
"""Lee del SDK y pone en cola."""
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
await queue.put(message)
finally:
await queue.put(None) # Señal de fin
async def consumer():
"""Procesa mensajes de la cola (puede ser lento)."""
while True:
message = await queue.get()
if message is None:
break
# Simulación de procesamiento lento
await asyncio.sleep(0.01)
print(f"Procesando: {type(message).__name__}")
queue.task_done()
# Correr productor y consumidor concurrentemente
await asyncio.gather(producer(), consumer())
async def agent_with_rate_limit(prompt: str, cwd: str, max_messages_per_second: float = 10):
"""Limita la tasa de procesamiento."""
interval = 1.0 / max_messages_per_second
last_processed = 0.0
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
import time
now = time.time()
elapsed = now - last_processed
if elapsed < interval:
await asyncio.sleep(interval - elapsed)
# Procesar mensaje
print(f"Mensaje: {type(message).__name__}")
last_processed = time.time()
Memory management
import gc
from typing import AsyncGenerator
from claude_code_sdk import query, ClaudeCodeOptions
class MemoryEfficientStreamer:
"""Procesa el stream sin acumular en memoria."""
def __init__(self, max_buffer_mb: float = 10):
self.max_buffer_bytes = max_buffer_mb * 1024 * 1024
self._buffer_size = 0
async def stream(
self,
prompt: str,
cwd: str
) -> AsyncGenerator[dict, None]:
buffer = []
buffer_bytes = 0
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
# Estimar tamaño
import sys
msg_size = sys.getsizeof(message)
buffer_bytes += msg_size
# Si el buffer está lleno, vaciar
if buffer_bytes > self.max_buffer_bytes:
for buffered in buffer:
yield buffered
buffer.clear()
buffer_bytes = 0
gc.collect() # Forzar garbage collection
buffer.append(self._extract_data(message))
# Vaciar buffer restante
for buffered in buffer:
yield buffered
def _extract_data(self, message) -> dict:
"""Extrae solo los datos necesarios, descartando el resto."""
from claude_code_sdk import AssistantMessage
if isinstance(message, AssistantMessage):
texts = []
tools = []
for block in message.content:
if hasattr(block, "text"):
texts.append(block.text)
elif hasattr(block, "name"):
tools.append(block.name)
return {"type": "assistant", "texts": texts, "tools": tools}
return {"type": type(message).__name__}
10. Ejemplo Completo: Chat en Tiempo Real
Este ejemplo combina todo lo aprendido en un sistema de chat completo con backend y frontend.
Backend FastAPI (Python)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
from dataclasses import dataclass, field
from typing import Optional
import json
import asyncio
import uuid
from datetime import datetime
app = FastAPI(title="Claude Chat en Tiempo Real")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@dataclass
class ChatMessage:
id: str
role: str # "user" | "assistant"
content: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
tool_calls: list[str] = field(default_factory=list)
@dataclass
class ChatSession:
id: str
cwd: str
messages: list[ChatMessage] = field(default_factory=list)
connections: set[WebSocket] = field(default_factory=set)
current_agent_task: Optional[asyncio.Task] = None
def add_connection(self, ws: WebSocket):
self.connections.add(ws)
def remove_connection(self, ws: WebSocket):
self.connections.discard(ws)
async def broadcast(self, event: dict):
dead = set()
for ws in self.connections.copy():
try:
await ws.send_text(json.dumps(event))
except Exception:
dead.add(ws)
self.connections -= dead
async def run_agent(self, user_message: str):
user_msg = ChatMessage(
id=str(uuid.uuid4()),
role="user",
content=user_message
)
self.messages.append(user_msg)
await self.broadcast({"type": "user_message", "message": user_msg.__dict__})
assistant_msg = ChatMessage(
id=str(uuid.uuid4()),
role="assistant",
content=""
)
await self.broadcast({"type": "assistant_start", "message_id": assistant_msg.id})
try:
async for message in query(
prompt=user_message,
options=ClaudeCodeOptions(cwd=self.cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
assistant_msg.content += block.text
await self.broadcast({
"type": "text_delta",
"message_id": assistant_msg.id,
"delta": block.text
})
elif hasattr(block, "name"):
assistant_msg.tool_calls.append(block.name)
await self.broadcast({
"type": "tool_use",
"message_id": assistant_msg.id,
"tool": block.name
})
elif isinstance(message, ResultMessage):
await self.broadcast({
"type": "assistant_done",
"message_id": assistant_msg.id,
"duration_ms": message.duration_ms,
"cost_usd": message.cost_usd,
})
except asyncio.CancelledError:
assistant_msg.content += "\n[Cancelado]"
await self.broadcast({"type": "assistant_cancelled", "message_id": assistant_msg.id})
self.messages.append(assistant_msg)
sessions: dict[str, ChatSession] = {}
@app.post("/api/sessions")
async def create_session(cwd: str = "/tmp"):
session_id = str(uuid.uuid4())
sessions[session_id] = ChatSession(id=session_id, cwd=cwd)
return {"session_id": session_id}
@app.get("/api/sessions/{session_id}/history")
async def get_history(session_id: str):
if session_id not in sessions:
raise HTTPException(status_code=404, detail="Sesión no encontrada")
return {"messages": [m.__dict__ for m in sessions[session_id].messages]}
@app.websocket("/ws/{session_id}")
async def websocket_chat(websocket: WebSocket, session_id: str):
if session_id not in sessions:
await websocket.close(code=4004, reason="Sesión no encontrada")
return
session = sessions[session_id]
await websocket.accept()
session.add_connection(websocket)
# Enviar historial
for msg in session.messages:
await websocket.send_text(json.dumps({"type": "history", "message": msg.__dict__}))
try:
while True:
data = await websocket.receive_text()
event = json.loads(data)
if event["type"] == "send_message":
if session.current_agent_task and not session.current_agent_task.done():
session.current_agent_task.cancel()
session.current_agent_task = asyncio.create_task(
session.run_agent(event["content"])
)
elif event["type"] == "cancel":
if session.current_agent_task:
session.current_agent_task.cancel()
except WebSocketDisconnect:
session.remove_connection(websocket)
Frontend completo HTML/JS
<!DOCTYPE html>
<html lang="es">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Claude Chat en Tiempo Real</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: -apple-system, sans-serif; display: flex;
flex-direction: column; height: 100vh; background: #0f0f0f; color: #e0e0e0; }
#header { background: #1a1a1a; padding: 1rem; border-bottom: 1px solid #333;
display: flex; justify-content: space-between; align-items: center; }
#messages { flex: 1; overflow-y: auto; padding: 1rem; display: flex; flex-direction: column; gap: 1rem; }
.message { max-width: 80%; padding: 0.75rem 1rem; border-radius: 12px; }
.message.user { align-self: flex-end; background: #2563eb; }
.message.assistant { align-self: flex-start; background: #1e1e1e;
border: 1px solid #333; white-space: pre-wrap; }
.tool-indicator { font-size: 0.75rem; color: #4ec9b0; margin-top: 0.5rem; }
.typing-indicator { display: flex; gap: 4px; padding: 0.5rem; }
.typing-dot { width: 8px; height: 8px; background: #4ec9b0; border-radius: 50%;
animation: typing 1s infinite; }
.typing-dot:nth-child(2) { animation-delay: 0.2s; }
.typing-dot:nth-child(3) { animation-delay: 0.4s; }
@keyframes typing { 0%, 60%, 100% { opacity: 0.3; } 30% { opacity: 1; } }
#input-area { background: #1a1a1a; padding: 1rem; border-top: 1px solid #333;
display: flex; gap: 0.5rem; }
#input { flex: 1; background: #2a2a2a; border: 1px solid #444; color: #e0e0e0;
border-radius: 8px; padding: 0.75rem; font-size: 1rem; resize: none; }
button { background: #2563eb; color: white; border: none; padding: 0.75rem 1.5rem;
border-radius: 8px; cursor: pointer; font-size: 1rem; }
button:disabled { opacity: 0.5; cursor: not-allowed; }
#cancel-btn { background: #dc2626; }
#status { font-size: 0.8rem; color: #666; }
</style>
</head>
<body>
<div id="header">
<h2>Claude Agent Chat</h2>
<span id="status">Desconectado</span>
</div>
<div id="messages"></div>
<div id="input-area">
<textarea id="input" rows="2" placeholder="Escribe tu mensaje..."></textarea>
<button id="send-btn" onclick="sendMessage()">Enviar</button>
<button id="cancel-btn" onclick="cancelAgent()" disabled>Cancelar</button>
</div>
<script>
let ws = null;
let sessionId = null;
let currentAssistantEl = null;
const messagesEl = document.getElementById("messages");
const statusEl = document.getElementById("status");
const sendBtn = document.getElementById("send-btn");
const cancelBtn = document.getElementById("cancel-btn");
async function init() {
const resp = await fetch("/api/sessions", { method: "POST" });
const data = await resp.json();
sessionId = data.session_id;
connectWS();
}
function connectWS() {
ws = new WebSocket(`ws://localhost:8000/ws/${sessionId}`);
ws.onopen = () => {
statusEl.textContent = "Conectado";
statusEl.style.color = "#4ec9b0";
};
ws.onclose = () => {
statusEl.textContent = "Desconectado - reconectando...";
statusEl.style.color = "#f44747";
setTimeout(connectWS, 2000);
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
handleWSEvent(data);
};
}
function handleWSEvent(event) {
switch (event.type) {
case "user_message":
addMessage("user", event.message.content);
break;
case "assistant_start":
currentAssistantEl = addMessage("assistant", "");
sendBtn.disabled = true;
cancelBtn.disabled = false;
showTypingIndicator();
break;
case "text_delta":
if (currentAssistantEl) {
hideTypingIndicator();
const textEl = currentAssistantEl.querySelector(".text-content");
textEl.textContent += event.delta;
messagesEl.scrollTop = messagesEl.scrollHeight;
}
break;
case "tool_use":
if (currentAssistantEl) {
const toolEl = currentAssistantEl.querySelector(".tool-indicator") ||
createToolIndicator(currentAssistantEl);
toolEl.textContent += `[${event.tool}] `;
}
break;
case "assistant_done":
case "assistant_cancelled":
sendBtn.disabled = false;
cancelBtn.disabled = true;
currentAssistantEl = null;
break;
case "history":
addMessage(event.message.role, event.message.content);
break;
}
}
function addMessage(role, content) {
const el = document.createElement("div");
el.className = `message ${role}`;
el.innerHTML = `
<div class="text-content">${content}</div>
${role === "assistant" ? '<div class="tool-indicator"></div>' : ""}
`;
messagesEl.appendChild(el);
messagesEl.scrollTop = messagesEl.scrollHeight;
return el;
}
let typingEl = null;
function showTypingIndicator() {
typingEl = document.createElement("div");
typingEl.className = "message assistant";
typingEl.innerHTML = `
<div class="typing-indicator">
<div class="typing-dot"></div>
<div class="typing-dot"></div>
<div class="typing-dot"></div>
</div>
`;
messagesEl.appendChild(typingEl);
}
function hideTypingIndicator() {
if (typingEl) {
typingEl.remove();
typingEl = null;
}
}
function createToolIndicator(parent) {
const el = parent.querySelector(".tool-indicator");
return el;
}
function sendMessage() {
const input = document.getElementById("input");
const content = input.value.trim();
if (!content || !ws) return;
ws.send(JSON.stringify({ type: "send_message", content }));
input.value = "";
}
function cancelAgent() {
if (ws) ws.send(JSON.stringify({ type: "cancel" }));
}
document.getElementById("input").addEventListener("keydown", (e) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
});
init();
</script>
</body>
</html>
Este ejemplo completo implementa un chat en tiempo real con:
- Historial de sesión persistente
- Múltiples usuarios pueden conectarse al mismo chat
- Indicador de escritura mientras el agente procesa
- Cancelación del agente en tiempo real
- Reconexión automática del WebSocket
- Visualización de herramientas usadas
Resumen del Capítulo
mindmap
root((Streaming))
Por qué
Latencia percibida
Experiencia usuario
Feedback inmediato
SDK Nativo
AsyncGenerator
AssistantMessage
ResultMessage
Terminal
Rich library
Spinners
Progress bars
Web
SSE
FastAPI
Express
Cliente HTML
WebSocket
Bidireccional
Múltiples clientes
Reconexión
Avanzado
Cancelación
Backpressure
File logging
Progress tracking
En el próximo capítulo aprenderás a testear estos agentes de streaming de forma robusta y sin gastar en llamadas API reales.