Capítulo 11: Streaming y Eventos en Tiempo Real

Por: Artiko
claudeagent-sdkstreamingtiempo-realsse

Capítulo 11: Streaming y Eventos en Tiempo Real

El streaming transforma la experiencia del usuario con agentes de IA. En vez de esperar 30 segundos para ver un resultado, el usuario ve el progreso inmediatamente. Este capítulo cubre todo lo necesario para implementar streaming de producción.


1. ¿Por qué Streaming?

Latencia percibida vs latencia real

Hay una diferencia fundamental entre latencia real y latencia percibida. La latencia real es el tiempo desde que el usuario envía una petición hasta que el sistema tiene la respuesta completa. La latencia percibida es cuánto tiempo siente el usuario que espera.

El streaming no reduce la latencia real. El agente tarda lo mismo en completar su trabajo. Lo que cambia es la latencia percibida: el usuario empieza a ver resultados en milisegundos, lo que transforma una espera ansiosa en una experiencia interactiva.

Estudios de UX muestran que los usuarios toleran hasta 10 segundos de espera si ven progreso visible, pero abandonan después de 3 segundos si no hay feedback. El streaming resuelve este problema de raíz.

Sin streaming vs con streaming

sequenceDiagram
    participant U as Usuario
    participant A as Agente
    participant C as Claude API

    rect rgb(255, 220, 220)
        Note over U,C: Sin Streaming
        U->>A: "Refactoriza mi código"
        A->>C: Petición completa
        Note over C: Procesando... 30s
        C->>A: Respuesta completa
        A->>U: Resultado (30s después)
        Note over U: ¿Está funcionando?
    end

    rect rgb(220, 255, 220)
        Note over U,C: Con Streaming
        U->>A: "Refactoriza mi código"
        A->>C: Petición con stream=true
        C-->>A: MessageStart
        C-->>A: TextDelta "Analizando..."
        C-->>A: ToolUse "Read archivo.py"
        A-->>U: "Leyendo archivo..."
        C-->>A: TextDelta "Encontré 3 problemas..."
        A-->>U: "Encontré 3 problemas..."
        C-->>A: ToolUse "Write archivo.py"
        A-->>U: "Aplicando cambios..."
        C-->>A: ResultMessage
        A->>U: "¡Listo!" (feedback continuo)
    end

La diferencia visual es dramática. Con streaming, el usuario sabe exactamente qué está haciendo el agente en cada momento.

El async generator del SDK ya es streaming nativo

Una de las grandes ventajas del Claude Code SDK es que ya implementa streaming internamente. La función query() retorna un async generator que emite mensajes a medida que llegan del proceso de Claude Code, no cuando termina.

from claude_code_sdk import query, ClaudeCodeOptions

async def main():
    # query() retorna AsyncGenerator[Message, None]
    # Cada iteración retorna cuando hay un nuevo mensaje disponible
    # NO espera a que todo termine
    async for message in query(
        prompt="Analiza este proyecto",
        options=ClaudeCodeOptions(cwd="/mi/proyecto")
    ):
        # Este código corre tan pronto como llega un mensaje
        print(f"Nuevo mensaje: {type(message).__name__}")
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

async function main() {
  // query() retorna AsyncGenerator<SDKMessage>
  // Cada yield ocurre cuando llega un nuevo mensaje
  for await (const message of query({
    prompt: "Analiza este proyecto",
    options: { cwd: "/mi/proyecto" } as ClaudeCodeOptions,
  })) {
    // Este código corre tan pronto como llega un mensaje
    console.log(`Nuevo mensaje: ${message.type}`);
  }
}

Tipos de mensajes y su orden en el tiempo

El SDK emite mensajes en un orden predecible. Entender este orden es clave para implementar streaming efectivo:

sequenceDiagram
    participant SDK as SDK Generator
    participant App as Tu Aplicación

    SDK-->>App: SystemMessage (configuración inicial)
    Note over App: Mostrar "Iniciando agente..."

    SDK-->>App: AssistantMessage (texto + herramientas)
    Note over App: Mostrar texto parcial

    SDK-->>App: AssistantMessage (tool_use: Bash)
    Note over App: Mostrar "Ejecutando comando..."

    SDK-->>App: AssistantMessage (más texto)
    Note over App: Actualizar UI

    SDK-->>App: ResultMessage (resumen final)
    Note over App: Mostrar resultado final, costo

Los tipos principales son:

Cada AssistantMessage puede contener múltiples content blocks, y cada block puede ser TextBlock (texto del asistente) o ToolUseBlock (llamada a una herramienta).


2. AssistantMessage Streaming

Cómo extraer texto en tiempo real

El AssistantMessage contiene una lista de content blocks. Para streaming de texto, necesitas iterar estos blocks y mostrarlos a medida que llegan.

from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
from claude_code_sdk.types import TextBlock, ToolUseBlock

async def stream_text(prompt: str, cwd: str):
    """Extrae y muestra texto en tiempo real."""
    full_text = []

    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(cwd=cwd)
    ):
        if not isinstance(message, AssistantMessage):
            continue

        for block in message.content:
            if isinstance(block, TextBlock):
                # Texto del asistente - mostrar inmediatamente
                print(block.text, end="", flush=True)
                full_text.append(block.text)
            elif isinstance(block, ToolUseBlock):
                # El agente está usando una herramienta
                print(f"\n[Usando: {block.name}]", flush=True)

    print()  # Nueva línea al final
    return "".join(full_text)
import { query, ClaudeCodeOptions, AssistantMessage } from "@anthropic-ai/claude-code-sdk";

async function streamText(prompt: string, cwd: string): Promise<string> {
  const fullText: string[] = [];

  for await (const message of query({
    prompt,
    options: { cwd } as ClaudeCodeOptions,
  })) {
    if (message.type !== "assistant") continue;

    const assistantMessage = message as AssistantMessage;
    for (const block of assistantMessage.message.content) {
      if (block.type === "text") {
        process.stdout.write(block.text);
        fullText.push(block.text);
      } else if (block.type === "tool_use") {
        process.stdout.write(`\n[Usando: ${block.name}]\n`);
      }
    }
  }

  process.stdout.write("\n");
  return fullText.join("");
}

Content blocks: text vs tool_use

Un AssistantMessage puede contener bloques mezclados. Es normal que Claude alterne entre texto y llamadas a herramientas:

from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
from claude_code_sdk.types import TextBlock, ToolUseBlock
from dataclasses import dataclass
from typing import Union

@dataclass
class StreamEvent:
    kind: str  # "text" | "tool_start" | "tool_input"
    data: Union[str, dict]

async def stream_with_events(prompt: str, cwd: str):
    """Stream con eventos estructurados."""
    events = []

    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(cwd=cwd)
    ):
        if not isinstance(message, AssistantMessage):
            continue

        for block in message.content:
            if isinstance(block, TextBlock):
                event = StreamEvent(kind="text", data=block.text)
                events.append(event)
                yield event

            elif isinstance(block, ToolUseBlock):
                # Inicio de herramienta
                start_event = StreamEvent(
                    kind="tool_start",
                    data={"name": block.name, "id": block.id}
                )
                events.append(start_event)
                yield start_event

                # Input de la herramienta
                input_event = StreamEvent(
                    kind="tool_input",
                    data=block.input
                )
                events.append(input_event)
                yield input_event

# Usar el generator
async def main():
    async for event in stream_with_events("Refactoriza app.py", "/proyecto"):
        if event.kind == "text":
            print(event.data, end="", flush=True)
        elif event.kind == "tool_start":
            print(f"\n→ Herramienta: {event.data['name']}")
        elif event.kind == "tool_input":
            print(f"  Input: {event.data}")

Mostrar progreso mientras el agente piensa

Entre mensajes puede haber pausas mientras Claude procesa. Un spinner o indicador de actividad mejora la UX:

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
import sys

async def stream_with_thinking_indicator(prompt: str, cwd: str):
    """Muestra indicador mientras el agente procesa."""
    thinking = False
    spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
    spinner_idx = 0

    async def show_spinner():
        nonlocal spinner_idx
        while thinking:
            char = spinner_chars[spinner_idx % len(spinner_chars)]
            sys.stdout.write(f"\r{char} Pensando...")
            sys.stdout.flush()
            spinner_idx += 1
            await asyncio.sleep(0.1)

    spinner_task = None

    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(cwd=cwd)
    ):
        # Parar spinner cuando llega mensaje
        thinking = False
        if spinner_task and not spinner_task.done():
            spinner_task.cancel()
            sys.stdout.write("\r" + " " * 20 + "\r")  # Limpiar línea

        if isinstance(message, AssistantMessage):
            for block in message.content:
                if hasattr(block, "text"):
                    print(block.text, end="", flush=True)
                elif hasattr(block, "name"):
                    print(f"\n[{block.name}]", end=" ", flush=True)

        elif isinstance(message, ResultMessage):
            print(f"\n✓ Completado en {message.duration_ms}ms")
            break

        # Iniciar spinner para el siguiente mensaje
        thinking = True
        spinner_task = asyncio.create_task(show_spinner())

Buffer de texto parcial

En algunos casos necesitas acumular texto antes de procesarlo (por ejemplo, para buscar patrones completos):

from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
import re

class TextBuffer:
    """Buffer para acumular y procesar texto parcial."""

    def __init__(self, flush_on: str = "\n"):
        self.buffer = ""
        self.flush_on = flush_on
        self.lines = []

    def add(self, text: str) -> list[str]:
        """Agrega texto y retorna líneas completas."""
        self.buffer += text
        complete_lines = []

        while self.flush_on in self.buffer:
            idx = self.buffer.index(self.flush_on)
            line = self.buffer[:idx]
            complete_lines.append(line)
            self.lines.append(line)
            self.buffer = self.buffer[idx + len(self.flush_on):]

        return complete_lines

    def flush(self) -> str:
        """Vacía el buffer restante."""
        remaining = self.buffer
        self.buffer = ""
        return remaining


async def stream_with_buffer(prompt: str, cwd: str):
    buf = TextBuffer(flush_on="\n")

    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(cwd=cwd)
    ):
        if isinstance(message, AssistantMessage):
            for block in message.content:
                if hasattr(block, "text"):
                    complete_lines = buf.add(block.text)
                    for line in complete_lines:
                        # Procesar línea completa (por ejemplo, colorear código)
                        if line.startswith("```"):
                            print(f"\033[33m{line}\033[0m")  # Amarillo
                        elif line.startswith("#"):
                            print(f"\033[1m{line}\033[0m")   # Negrita
                        else:
                            print(line)

    # Procesar texto restante
    remaining = buf.flush()
    if remaining:
        print(remaining)
import { query, ClaudeCodeOptions, AssistantMessage } from "@anthropic-ai/claude-code-sdk";

class TextBuffer {
  private buffer = "";
  private lines: string[] = [];

  constructor(private readonly flushOn: string = "\n") {}

  add(text: string): string[] {
    this.buffer += text;
    const completeLines: string[] = [];

    while (this.buffer.includes(this.flushOn)) {
      const idx = this.buffer.indexOf(this.flushOn);
      const line = this.buffer.slice(0, idx);
      completeLines.push(line);
      this.lines.push(line);
      this.buffer = this.buffer.slice(idx + this.flushOn.length);
    }

    return completeLines;
  }

  flush(): string {
    const remaining = this.buffer;
    this.buffer = "";
    return remaining;
  }
}

async function streamWithBuffer(prompt: string, cwd: string) {
  const buf = new TextBuffer("\n");

  for await (const message of query({
    prompt,
    options: { cwd } as ClaudeCodeOptions,
  })) {
    if (message.type !== "assistant") continue;

    const assistantMessage = message as AssistantMessage;
    for (const block of assistantMessage.message.content) {
      if (block.type === "text") {
        const lines = buf.add(block.text);
        for (const line of lines) {
          console.log(line);
        }
      }
    }
  }

  const remaining = buf.flush();
  if (remaining) console.log(remaining);
}

3. Terminal Output en Tiempo Real

Rich library para output bonito (Python)

La librería rich de Python transforma la salida de terminal con colores, tablas, progress bars y paneles. Es perfecta para agentes interactivos:

from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.spinner import Spinner
from rich.text import Text
from rich.table import Table
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage

console = Console()

async def rich_streaming_agent(prompt: str, cwd: str):
    """Agente con output rico en terminal."""
    tool_usage = []
    current_text = Text()

    with Live(
        Panel(Spinner("dots", text="Iniciando agente..."), title="Claude Agent"),
        console=console,
        refresh_per_second=10
    ) as live:

        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            if isinstance(message, AssistantMessage):
                for block in message.content:
                    if hasattr(block, "text"):
                        current_text.append(block.text)
                        live.update(Panel(
                            current_text,
                            title="[bold green]Claude Agent[/]",
                            border_style="green"
                        ))

                    elif hasattr(block, "name"):
                        tool_name = block.name
                        tool_usage.append(tool_name)

                        # Mostrar herramienta en uso
                        tools_table = Table(show_header=False, box=None)
                        for tool in tool_usage[-5:]:  # Últimas 5
                            tools_table.add_row(f"→ [cyan]{tool}[/]")

                        from rich.columns import Columns
                        live.update(Panel(
                            Columns([current_text, tools_table]),
                            title="[bold green]Claude Agent[/]"
                        ))

            elif isinstance(message, ResultMessage):
                # Mostrar resumen final
                summary = Table(title="Resumen", show_header=True)
                summary.add_column("Métrica")
                summary.add_column("Valor", style="cyan")
                summary.add_row("Duración", f"{message.duration_ms}ms")
                summary.add_row("Herramientas", str(len(tool_usage)))
                summary.add_row("Herramientas usadas", ", ".join(set(tool_usage)))

                live.update(Panel(summary, title="[bold blue]Completado[/]"))

    return message

Spinner mientras el agente trabaja

from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn

console = Console()

async def agent_with_spinner(prompt: str, cwd: str):
    """Agente con spinner elegante."""
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        TimeElapsedColumn(),
        console=console,
        transient=True  # Se borra al terminar
    ) as progress:

        task = progress.add_task("Iniciando agente...", total=None)

        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            if isinstance(message, AssistantMessage):
                for block in message.content:
                    if hasattr(block, "name"):
                        progress.update(
                            task,
                            description=f"Usando [cyan]{block.name}[/]..."
                        )
                    elif hasattr(block, "text") and block.text.strip():
                        # Mostrar primeras palabras del texto
                        preview = block.text[:50].replace("\n", " ")
                        progress.update(task, description=f"Pensando: {preview}...")

    console.print("[green]✓ Agente completado[/green]")

TypeScript equivalent con chalk/ora

import { query, ClaudeCodeOptions, AssistantMessage, ResultMessage } from "@anthropic-ai/claude-code-sdk";
import ora, { Ora } from "ora";
import chalk from "chalk";

async function richStreamingAgent(prompt: string, cwd: string) {
  const spinner: Ora = ora("Iniciando agente...").start();
  const toolsUsed: string[] = [];
  const outputLines: string[] = [];

  try {
    for await (const message of query({
      prompt,
      options: { cwd } as ClaudeCodeOptions,
    })) {
      if (message.type === "assistant") {
        const assistantMessage = message as AssistantMessage;

        for (const block of assistantMessage.message.content) {
          if (block.type === "text" && block.text.trim()) {
            spinner.stop();
            const lines = block.text.split("\n");
            for (const line of lines) {
              if (line.startsWith("# ")) {
                console.log(chalk.bold.blue(line));
              } else if (line.startsWith("```")) {
                console.log(chalk.yellow(line));
              } else {
                console.log(line);
              }
              outputLines.push(line);
            }
            spinner.start("Procesando...");

          } else if (block.type === "tool_use") {
            toolsUsed.push(block.name);
            spinner.text = chalk.cyan(`Usando: ${block.name}`);
          }
        }
      } else if (message.type === "result") {
        const resultMessage = message as ResultMessage;
        spinner.succeed(chalk.green("Agente completado"));

        console.log("\n" + chalk.bold("=== Resumen ==="));
        console.log(`Duración: ${chalk.cyan(resultMessage.duration_ms + "ms")}`);
        console.log(`Herramientas: ${chalk.cyan(toolsUsed.join(", ") || "ninguna")}`);
      }
    }
  } catch (error) {
    spinner.fail(chalk.red(`Error: ${error}`));
    throw error;
  }
}

4. Server-Sent Events (SSE)

¿Qué son los SSE?

Server-Sent Events es un protocolo HTTP estándar para streaming unidireccional del servidor al cliente. Es ideal para agentes porque:

  1. Usa HTTP estándar (no necesita WebSocket)
  2. Reconexión automática del navegador
  3. Soporte nativo en todos los navegadores modernos
  4. Más simple que WebSockets para streaming unidireccional
sequenceDiagram
    participant B as Navegador
    participant S as Servidor SSE
    participant A as Claude Agent

    B->>S: GET /api/agent/run (Accept: text/event-stream)
    S->>A: Iniciar agente

    loop Streaming
        A-->>S: Nuevo mensaje
        S-->>B: data: {"type":"text","content":"..."}\n\n
        B->>B: Actualizar UI
    end

    S-->>B: data: {"type":"done"}\n\n
    B->>B: Mostrar resultado final

Exponer el agente como endpoint SSE con FastAPI

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
import json
import asyncio

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)


async def agent_stream_generator(prompt: str, cwd: str):
    """Generator que emite eventos SSE."""

    # Evento de inicio
    yield f"data: {json.dumps({'type': 'start', 'prompt': prompt})}\n\n"

    try:
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            if isinstance(message, AssistantMessage):
                for block in message.content:
                    if hasattr(block, "text"):
                        event = {
                            "type": "text",
                            "content": block.text
                        }
                        yield f"data: {json.dumps(event)}\n\n"

                    elif hasattr(block, "name"):
                        event = {
                            "type": "tool_use",
                            "tool": block.name,
                            "input": block.input if hasattr(block, "input") else {}
                        }
                        yield f"data: {json.dumps(event)}\n\n"

            elif isinstance(message, ResultMessage):
                event = {
                    "type": "result",
                    "subtype": message.subtype,
                    "duration_ms": message.duration_ms,
                    "cost_usd": message.cost_usd,
                }
                yield f"data: {json.dumps(event)}\n\n"

    except Exception as e:
        error_event = {"type": "error", "message": str(e)}
        yield f"data: {json.dumps(error_event)}\n\n"

    finally:
        yield f"data: {json.dumps({'type': 'done'})}\n\n"


@app.post("/api/agent/run")
async def run_agent(request: Request):
    body = await request.json()
    prompt = body.get("prompt", "")
    cwd = body.get("cwd", "/tmp")

    return StreamingResponse(
        agent_stream_generator(prompt, cwd),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Para nginx
        }
    )


# Ejecutar: uvicorn main:app --reload

Exponer como SSE con Express/Hono

import { query, ClaudeCodeOptions, AssistantMessage, ResultMessage } from "@anthropic-ai/claude-code-sdk";
import express, { Request, Response } from "express";
import cors from "cors";

const app = express();
app.use(cors());
app.use(express.json());

app.post("/api/agent/run", async (req: Request, res: Response) => {
  const { prompt, cwd = "/tmp" } = req.body;

  // Headers para SSE
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("X-Accel-Buffering", "no");

  const sendEvent = (data: object) => {
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  };

  sendEvent({ type: "start", prompt });

  try {
    for await (const message of query({
      prompt,
      options: { cwd } as ClaudeCodeOptions,
    })) {
      if (message.type === "assistant") {
        const assistantMessage = message as AssistantMessage;
        for (const block of assistantMessage.message.content) {
          if (block.type === "text") {
            sendEvent({ type: "text", content: block.text });
          } else if (block.type === "tool_use") {
            sendEvent({ type: "tool_use", tool: block.name, input: block.input });
          }
        }
      } else if (message.type === "result") {
        const resultMessage = message as ResultMessage;
        sendEvent({
          type: "result",
          duration_ms: resultMessage.duration_ms,
        });
      }
    }
  } catch (error) {
    sendEvent({ type: "error", message: String(error) });
  } finally {
    sendEvent({ type: "done" });
    res.end();
  }
});

app.listen(3000, () => console.log("SSE server on port 3000"));

Cliente SSE en el navegador

<!DOCTYPE html>
<html lang="es">
<head>
    <meta charset="UTF-8">
    <title>Claude Agent en Tiempo Real</title>
    <style>
        body { font-family: monospace; max-width: 800px; margin: 2rem auto; padding: 1rem; }
        #output { background: #1e1e1e; color: #d4d4d4; padding: 1rem; min-height: 300px;
                  border-radius: 8px; white-space: pre-wrap; overflow-y: auto; }
        .tool-use { color: #4ec9b0; }
        .error { color: #f44747; }
        .done { color: #608b4e; font-weight: bold; }
        button { padding: 0.5rem 1rem; margin: 0.5rem; cursor: pointer; }
        #prompt { width: 100%; padding: 0.5rem; margin-bottom: 1rem; }
    </style>
</head>
<body>
    <h1>Claude Agent - Streaming Demo</h1>
    <textarea id="prompt" rows="3" placeholder="Escribe tu tarea...">Lista los archivos del directorio actual</textarea>
    <br>
    <button onclick="runAgent()">Ejecutar</button>
    <button onclick="stopAgent()">Detener</button>
    <div id="output"></div>

    <script>
        let eventSource = null;
        const output = document.getElementById("output");

        function appendToOutput(text, className = "") {
            const span = document.createElement("span");
            if (className) span.className = className;
            span.textContent = text;
            output.appendChild(span);
            output.scrollTop = output.scrollHeight;
        }

        async function runAgent() {
            if (eventSource) eventSource.close();
            output.innerHTML = "";

            const prompt = document.getElementById("prompt").value;

            // SSE via POST necesita fetch + ReadableStream
            const response = await fetch("/api/agent/run", {
                method: "POST",
                headers: { "Content-Type": "application/json" },
                body: JSON.stringify({ prompt, cwd: "/tmp" })
            });

            const reader = response.body.getReader();
            const decoder = new TextDecoder();
            let buffer = "";

            while (true) {
                const { done, value } = await reader.read();
                if (done) break;

                buffer += decoder.decode(value, { stream: true });
                const lines = buffer.split("\n\n");
                buffer = lines.pop() || "";

                for (const chunk of lines) {
                    if (!chunk.startsWith("data: ")) continue;
                    const jsonStr = chunk.slice(6);
                    try {
                        const event = JSON.parse(jsonStr);
                        handleEvent(event);
                    } catch (e) {
                        console.error("Parse error:", e);
                    }
                }
            }
        }

        function handleEvent(event) {
            switch (event.type) {
                case "start":
                    appendToOutput(`> ${event.prompt}\n\n`);
                    break;
                case "text":
                    appendToOutput(event.content);
                    break;
                case "tool_use":
                    appendToOutput(`\n[${event.tool}]\n`, "tool-use");
                    break;
                case "error":
                    appendToOutput(`\nERROR: ${event.message}\n`, "error");
                    break;
                case "done":
                    appendToOutput("\n\n--- Completado ---\n", "done");
                    break;
            }
        }

        function stopAgent() {
            if (eventSource) {
                eventSource.close();
                appendToOutput("\n[Detenido por usuario]\n", "error");
            }
        }
    </script>
</body>
</html>

Reconexión automática en SSE

Para SSE nativo (GET), el navegador reconecta automáticamente. Para POST-based SSE, necesitas implementarlo manualmente:

class ResilientAgentStream {
    constructor(prompt, cwd, onEvent) {
        this.prompt = prompt;
        this.cwd = cwd;
        this.onEvent = onEvent;
        this.retryCount = 0;
        this.maxRetries = 3;
        this.retryDelay = 1000;
        this.abortController = null;
    }

    async start() {
        while (this.retryCount <= this.maxRetries) {
            try {
                await this.connect();
                break; // Éxito, salir del loop
            } catch (error) {
                this.retryCount++;
                if (this.retryCount > this.maxRetries) {
                    this.onEvent({ type: "error", message: "Max retries reached" });
                    break;
                }
                const delay = this.retryDelay * Math.pow(2, this.retryCount - 1);
                this.onEvent({ type: "reconnecting", attempt: this.retryCount, delay });
                await new Promise(r => setTimeout(r, delay));
            }
        }
    }

    async connect() {
        this.abortController = new AbortController();
        const response = await fetch("/api/agent/run", {
            method: "POST",
            headers: { "Content-Type": "application/json" },
            body: JSON.stringify({ prompt: this.prompt, cwd: this.cwd }),
            signal: this.abortController.signal
        });

        if (!response.ok) throw new Error(`HTTP ${response.status}`);

        const reader = response.body.getReader();
        const decoder = new TextDecoder();
        let buffer = "";

        while (true) {
            const { done, value } = await reader.read();
            if (done) break;
            buffer += decoder.decode(value, { stream: true });
            // Procesar eventos...
        }
    }

    stop() {
        if (this.abortController) {
            this.abortController.abort();
        }
    }
}

5. WebSockets

Agente como servidor WebSocket

WebSockets permiten comunicación bidireccional: el usuario puede enviar mensajes mientras el agente está corriendo, o múltiples clientes pueden observar el mismo agente.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
import json
import asyncio

app = FastAPI()


class AgentSession:
    """Sesión de agente con WebSocket."""

    def __init__(self, websocket: WebSocket, session_id: str):
        self.websocket = websocket
        self.session_id = session_id
        self.active = True

    async def send(self, event: dict):
        if self.active:
            await self.websocket.send_text(json.dumps(event))

    async def run_agent(self, prompt: str, cwd: str):
        await self.send({"type": "agent_start", "session_id": self.session_id})

        try:
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(cwd=cwd)
            ):
                if isinstance(message, AssistantMessage):
                    for block in message.content:
                        if hasattr(block, "text"):
                            await self.send({
                                "type": "text",
                                "content": block.text
                            })
                        elif hasattr(block, "name"):
                            await self.send({
                                "type": "tool_use",
                                "tool": block.name
                            })

                elif isinstance(message, ResultMessage):
                    await self.send({
                        "type": "agent_done",
                        "duration_ms": message.duration_ms,
                        "cost_usd": message.cost_usd,
                    })

        except asyncio.CancelledError:
            await self.send({"type": "agent_cancelled"})
        except Exception as e:
            await self.send({"type": "error", "message": str(e)})


sessions: dict[str, AgentSession] = {}


@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
    await websocket.accept()

    session = AgentSession(websocket, session_id)
    sessions[session_id] = session

    agent_task = None

    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)

            if message["action"] == "start_agent":
                if agent_task and not agent_task.done():
                    agent_task.cancel()

                agent_task = asyncio.create_task(
                    session.run_agent(
                        prompt=message["prompt"],
                        cwd=message.get("cwd", "/tmp")
                    )
                )

            elif message["action"] == "cancel_agent":
                if agent_task and not agent_task.done():
                    agent_task.cancel()
                    await session.send({"type": "cancelled"})

    except WebSocketDisconnect:
        if agent_task and not agent_task.done():
            agent_task.cancel()
        del sessions[session_id]

Múltiples clientes viendo el mismo agente

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
import json
import asyncio
from typing import set as Set

app = FastAPI()


class BroadcastSession:
    """Un agente que puede tener múltiples observadores."""

    def __init__(self, session_id: str):
        self.session_id = session_id
        self.observers: set[WebSocket] = set()
        self.agent_task: asyncio.Task | None = None
        self.messages: list[dict] = []  # Historial para nuevos observadores

    async def add_observer(self, websocket: WebSocket):
        """Agrega un observador y le envía el historial."""
        self.observers.add(websocket)
        # Enviar historial de mensajes previos
        for msg in self.messages:
            await websocket.send_text(json.dumps({**msg, "replay": True}))

    def remove_observer(self, websocket: WebSocket):
        self.observers.discard(websocket)

    async def broadcast(self, event: dict):
        """Envía un evento a todos los observadores."""
        self.messages.append(event)
        dead_connections = set()

        for ws in self.observers.copy():
            try:
                await ws.send_text(json.dumps(event))
            except Exception:
                dead_connections.add(ws)

        self.observers -= dead_connections

    async def run_agent(self, prompt: str, cwd: str):
        await self.broadcast({"type": "start", "prompt": prompt})

        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            if isinstance(message, AssistantMessage):
                for block in message.content:
                    if hasattr(block, "text"):
                        await self.broadcast({"type": "text", "content": block.text})
                    elif hasattr(block, "name"):
                        await self.broadcast({"type": "tool", "name": block.name})

            elif isinstance(message, ResultMessage):
                await self.broadcast({"type": "done", "cost": message.cost_usd})


broadcast_sessions: dict[str, BroadcastSession] = {}


@app.websocket("/ws/watch/{session_id}")
async def watch_session(websocket: WebSocket, session_id: str):
    """Observar una sesión existente."""
    await websocket.accept()

    if session_id not in broadcast_sessions:
        await websocket.send_text(json.dumps({"type": "error", "message": "Sesión no encontrada"}))
        await websocket.close()
        return

    session = broadcast_sessions[session_id]
    await session.add_observer(websocket)

    try:
        while True:
            await websocket.receive_text()  # Mantener viva la conexión
    except WebSocketDisconnect:
        session.remove_observer(websocket)

TypeScript con ws library

import { query, ClaudeCodeOptions, AssistantMessage, ResultMessage } from "@anthropic-ai/claude-code-sdk";
import { WebSocketServer, WebSocket } from "ws";
import { IncomingMessage } from "http";

interface ClientMessage {
  action: "start_agent" | "cancel_agent";
  prompt?: string;
  cwd?: string;
}

const wss = new WebSocketServer({ port: 8080 });

wss.on("connection", (ws: WebSocket, req: IncomingMessage) => {
  console.log("Cliente conectado");
  let agentController: AbortController | null = null;

  ws.on("message", async (data: Buffer) => {
    const message: ClientMessage = JSON.parse(data.toString());

    if (message.action === "start_agent" && message.prompt) {
      // Cancelar agente anterior si existe
      agentController?.abort();
      agentController = new AbortController();

      const sendEvent = (event: object) => {
        if (ws.readyState === WebSocket.OPEN) {
          ws.send(JSON.stringify(event));
        }
      };

      sendEvent({ type: "start" });

      try {
        for await (const msg of query({
          prompt: message.prompt,
          options: { cwd: message.cwd ?? "/tmp" } as ClaudeCodeOptions,
        })) {
          if (agentController.signal.aborted) break;

          if (msg.type === "assistant") {
            const assistantMsg = msg as AssistantMessage;
            for (const block of assistantMsg.message.content) {
              if (block.type === "text") {
                sendEvent({ type: "text", content: block.text });
              } else if (block.type === "tool_use") {
                sendEvent({ type: "tool_use", tool: block.name });
              }
            }
          } else if (msg.type === "result") {
            const resultMsg = msg as ResultMessage;
            sendEvent({ type: "done", duration_ms: resultMsg.duration_ms });
          }
        }
      } catch (error) {
        if (!agentController.signal.aborted) {
          sendEvent({ type: "error", message: String(error) });
        }
      }
    } else if (message.action === "cancel_agent") {
      agentController?.abort();
      ws.send(JSON.stringify({ type: "cancelled" }));
    }
  });

  ws.on("close", () => {
    agentController?.abort();
    console.log("Cliente desconectado");
  });
});

console.log("WebSocket server en puerto 8080");

6. Streaming a Fichero de Log

Escribir cada evento a un archivo en tiempo real

Para jobs largos en background, es útil hacer log de cada evento inmediatamente:

import json
import asyncio
from pathlib import Path
from datetime import datetime
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage


async def agent_with_file_log(prompt: str, cwd: str, log_path: str):
    """Agente que escribe cada evento a un archivo de log."""
    log_file = Path(log_path)
    log_file.parent.mkdir(parents=True, exist_ok=True)

    with open(log_file, "w", buffering=1) as f:  # buffering=1 = line buffered
        def log(event: dict):
            entry = {
                "timestamp": datetime.now().isoformat(),
                **event
            }
            f.write(json.dumps(entry) + "\n")

        log({"type": "session_start", "prompt": prompt, "cwd": cwd})

        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            if isinstance(message, AssistantMessage):
                for block in message.content:
                    if hasattr(block, "text"):
                        log({"type": "text", "content": block.text})
                    elif hasattr(block, "name"):
                        log({"type": "tool_use", "tool": block.name})

            elif isinstance(message, ResultMessage):
                log({
                    "type": "session_end",
                    "duration_ms": message.duration_ms,
                    "cost_usd": message.cost_usd,
                })

    return log_file


# Lanzar en background y seguir el log
async def run_background_agent():
    log_path = f"/tmp/agent_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl"

    # Lanzar agente en background
    task = asyncio.create_task(
        agent_with_file_log(
            prompt="Analiza y documenta todo el proyecto",
            cwd="/mi/proyecto",
            log_path=log_path
        )
    )

    print(f"Agente corriendo en background. Log: {log_path}")
    print(f"Sigue el log con: tail -f {log_path} | python -m json.tool")

    return task

Tail del archivo mientras corre

import asyncio
from pathlib import Path
import json


async def tail_log(log_path: str):
    """Sigue un archivo de log en tiempo real."""
    path = Path(log_path)

    # Esperar a que el archivo exista
    while not path.exists():
        await asyncio.sleep(0.1)

    with open(path, "r") as f:
        # Ir al final
        f.seek(0, 2)

        while True:
            line = f.readline()
            if not line:
                await asyncio.sleep(0.05)
                continue

            try:
                event = json.loads(line.strip())
                event_type = event.get("type", "")

                if event_type == "text":
                    print(event["content"], end="", flush=True)
                elif event_type == "tool_use":
                    print(f"\n[{event['tool']}]", end=" ", flush=True)
                elif event_type == "session_end":
                    print(f"\n\n✓ Completado en {event['duration_ms']}ms")
                    break
            except json.JSONDecodeError:
                pass

7. Progress Tracking

Estimar progreso de una tarea larga

El agente no siempre sabe cuánto falta, pero podemos estimar progreso basándonos en heurísticas:

from dataclasses import dataclass, field
from typing import Optional
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage


@dataclass
class ProgressTracker:
    """Rastrea el progreso estimado de un agente."""
    total_steps: int = 10  # Estimado
    current_step: int = 0
    tool_calls: list[str] = field(default_factory=list)
    start_time: float = field(default_factory=lambda: __import__("time").time())

    def on_tool_use(self, tool_name: str):
        self.tool_calls.append(tool_name)
        # Cada tool use cuenta como progreso
        self.current_step = min(self.current_step + 1, self.total_steps - 1)

    def on_text(self, text: str):
        # Palabras clave que indican progreso
        milestones = {
            "analizando": 1,
            "encontré": 2,
            "modificando": 3,
            "completado": 4,
            "terminado": 4,
        }
        text_lower = text.lower()
        for keyword, bonus in milestones.items():
            if keyword in text_lower:
                self.current_step = min(self.current_step + bonus, self.total_steps - 1)

    @property
    def percentage(self) -> float:
        return (self.current_step / self.total_steps) * 100

    @property
    def elapsed_seconds(self) -> float:
        return __import__("time").time() - self.start_time


async def agent_with_progress(prompt: str, cwd: str):
    from tqdm.asyncio import tqdm

    tracker = ProgressTracker(total_steps=20)

    with tqdm(total=100, desc="Progreso", unit="%", bar_format="{l_bar}{bar}| {n:.0f}%") as pbar:

        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            if isinstance(message, AssistantMessage):
                for block in message.content:
                    if hasattr(block, "text"):
                        tracker.on_text(block.text)
                    elif hasattr(block, "name"):
                        tracker.on_tool_use(block.name)
                        pbar.set_postfix(tool=block.name)

                new_pct = tracker.percentage
                delta = new_pct - pbar.n
                if delta > 0:
                    pbar.update(delta)

            elif isinstance(message, ResultMessage):
                pbar.update(100 - pbar.n)  # Completar al 100%
                pbar.set_postfix(done=True)

TypeScript con cli-progress

import { query, ClaudeCodeOptions, AssistantMessage } from "@anthropic-ai/claude-code-sdk";
import cliProgress from "cli-progress";
import chalk from "chalk";

async function agentWithProgress(prompt: string, cwd: string) {
  const bar = new cliProgress.SingleBar(
    {
      format: `Claude Agent |${chalk.cyan("{bar}")}| {percentage}% | {tool}`,
      barCompleteChar: "\u2588",
      barIncompleteChar: "\u2591",
    },
    cliProgress.Presets.shades_classic
  );

  bar.start(100, 0, { tool: "iniciando..." });

  let progress = 0;
  const increment = (amount: number, tool = "") => {
    progress = Math.min(progress + amount, 99);
    bar.update(progress, { tool });
  };

  for await (const message of query({
    prompt,
    options: { cwd } as ClaudeCodeOptions,
  })) {
    if (message.type === "assistant") {
      const assistantMessage = message as AssistantMessage;
      for (const block of assistantMessage.message.content) {
        if (block.type === "tool_use") {
          increment(5, block.name);
        } else if (block.type === "text") {
          increment(2, "procesando...");
        }
      }
    } else if (message.type === "result") {
      bar.update(100, { tool: "completado" });
    }
  }

  bar.stop();
}

Notificaciones de hitos

from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
from typing import Callable


async def agent_with_milestones(
    prompt: str,
    cwd: str,
    on_milestone: Callable[[str, dict], None]
):
    """Agente que notifica en hitos clave."""
    tool_count = 0
    text_accumulator = []

    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(cwd=cwd)
    ):
        if isinstance(message, AssistantMessage):
            for block in message.content:
                if hasattr(block, "name"):
                    tool_count += 1

                    # Hito: primera herramienta
                    if tool_count == 1:
                        on_milestone("first_tool", {"tool": block.name})

                    # Hito: 5 herramientas (tarea compleja)
                    if tool_count == 5:
                        on_milestone("complex_task", {"tool_count": tool_count})

                elif hasattr(block, "text"):
                    text_accumulator.append(block.text)
                    full_text = " ".join(text_accumulator).lower()

                    if "error" in full_text and "corregido" not in full_text:
                        on_milestone("error_detected", {"text": block.text})

                    if any(w in full_text for w in ["completado", "listo", "terminado"]):
                        on_milestone("near_completion", {})


# Uso
def handle_milestone(name: str, data: dict):
    icons = {
        "first_tool": "🔧",
        "complex_task": "⚙️",
        "error_detected": "⚠️",
        "near_completion": "✅"
    }
    print(f"\n{icons.get(name, '📍')} Hito: {name} - {data}")


import asyncio
asyncio.run(agent_with_milestones(
    "Refactoriza el proyecto completo",
    "/mi/proyecto",
    handle_milestone
))

8. Cancelación de Streams

AbortController en TypeScript

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

async function cancellableAgent(
  prompt: string,
  cwd: string,
  timeoutMs: number = 60000
): Promise<void> {
  const controller = new AbortController();

  // Auto-cancelar después del timeout
  const timeoutId = setTimeout(() => {
    controller.abort(new Error(`Timeout after ${timeoutMs}ms`));
  }, timeoutMs);

  try {
    for await (const message of query({
      prompt,
      options: { cwd } as ClaudeCodeOptions,
    })) {
      // Verificar si fue cancelado
      if (controller.signal.aborted) {
        console.log("Agente cancelado");
        break;
      }

      // Procesar mensaje...
      console.log(message.type);
    }
  } catch (error) {
    if (controller.signal.aborted) {
      console.log("Stream cancelado limpiamente");
    } else {
      throw error;
    }
  } finally {
    clearTimeout(timeoutId);
    // Limpiar recursos aquí
  }
}

// Cancelación manual desde UI
const controller = new AbortController();

// En un botón "Cancelar":
function onCancelClick() {
  controller.abort(new Error("Usuario canceló"));
}

asyncio.CancelledError en Python

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions


async def cancellable_agent(prompt: str, cwd: str):
    """Agente que maneja cancelación limpiamente."""
    task = asyncio.current_task()
    cleanup_done = False

    try:
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            # Procesar mensaje
            print(f"Mensaje: {type(message).__name__}")

    except asyncio.CancelledError:
        print("Agente cancelado, limpiando recursos...")
        cleanup_done = True
        # Aquí: guardar estado, cerrar conexiones, etc.
        raise  # Importante: re-raise para que asyncio sepa que fue cancelado

    finally:
        if not cleanup_done:
            print("Limpieza normal")
        # Siempre ejecutado: cerrar archivos, conexiones, etc.


async def main_with_cancel():
    task = asyncio.create_task(
        cancellable_agent("Tarea muy larga", "/proyecto")
    )

    # Cancelar después de 10 segundos
    await asyncio.sleep(10)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Tarea cancelada exitosamente")

Graceful shutdown del agente

import asyncio
import signal
from claude_code_sdk import query, ClaudeCodeOptions


class GracefulAgent:
    def __init__(self):
        self._shutdown = asyncio.Event()
        self._current_task: asyncio.Task | None = None

    def setup_signal_handlers(self):
        """Capturar Ctrl+C y SIGTERM."""
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(sig, self._handle_shutdown)

    def _handle_shutdown(self):
        print("\nShutdown solicitado...")
        self._shutdown.set()
        if self._current_task:
            self._current_task.cancel()

    async def run(self, prompt: str, cwd: str):
        self.setup_signal_handlers()

        async def run_agent():
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(cwd=cwd)
            ):
                if self._shutdown.is_set():
                    break
                print(f"[{type(message).__name__}]")

        self._current_task = asyncio.create_task(run_agent())

        try:
            await self._current_task
        except asyncio.CancelledError:
            print("Agente detenido gracefully")
        finally:
            print("Recursos liberados")


agent = GracefulAgent()
asyncio.run(agent.run("Tarea larga", "/proyecto"))

9. Backpressure y Buffers

Qué pasa si el consumidor es más lento que el productor

El SDK de Claude Code produce mensajes a la velocidad de la API. Si tu código de procesamiento es lento, puedes acumular mensajes en memoria. Hay que gestionar esto:

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions


async def agent_with_queue(prompt: str, cwd: str):
    """Usa una cola para manejar backpressure."""
    queue: asyncio.Queue = asyncio.Queue(maxsize=100)

    async def producer():
        """Lee del SDK y pone en cola."""
        try:
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(cwd=cwd)
            ):
                await queue.put(message)
        finally:
            await queue.put(None)  # Señal de fin

    async def consumer():
        """Procesa mensajes de la cola (puede ser lento)."""
        while True:
            message = await queue.get()
            if message is None:
                break

            # Simulación de procesamiento lento
            await asyncio.sleep(0.01)
            print(f"Procesando: {type(message).__name__}")
            queue.task_done()

    # Correr productor y consumidor concurrentemente
    await asyncio.gather(producer(), consumer())


async def agent_with_rate_limit(prompt: str, cwd: str, max_messages_per_second: float = 10):
    """Limita la tasa de procesamiento."""
    interval = 1.0 / max_messages_per_second
    last_processed = 0.0

    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(cwd=cwd)
    ):
        import time
        now = time.time()
        elapsed = now - last_processed

        if elapsed < interval:
            await asyncio.sleep(interval - elapsed)

        # Procesar mensaje
        print(f"Mensaje: {type(message).__name__}")
        last_processed = time.time()

Memory management

import gc
from typing import AsyncGenerator
from claude_code_sdk import query, ClaudeCodeOptions


class MemoryEfficientStreamer:
    """Procesa el stream sin acumular en memoria."""

    def __init__(self, max_buffer_mb: float = 10):
        self.max_buffer_bytes = max_buffer_mb * 1024 * 1024
        self._buffer_size = 0

    async def stream(
        self,
        prompt: str,
        cwd: str
    ) -> AsyncGenerator[dict, None]:
        buffer = []
        buffer_bytes = 0

        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            # Estimar tamaño
            import sys
            msg_size = sys.getsizeof(message)
            buffer_bytes += msg_size

            # Si el buffer está lleno, vaciar
            if buffer_bytes > self.max_buffer_bytes:
                for buffered in buffer:
                    yield buffered
                buffer.clear()
                buffer_bytes = 0
                gc.collect()  # Forzar garbage collection

            buffer.append(self._extract_data(message))

        # Vaciar buffer restante
        for buffered in buffer:
            yield buffered

    def _extract_data(self, message) -> dict:
        """Extrae solo los datos necesarios, descartando el resto."""
        from claude_code_sdk import AssistantMessage
        if isinstance(message, AssistantMessage):
            texts = []
            tools = []
            for block in message.content:
                if hasattr(block, "text"):
                    texts.append(block.text)
                elif hasattr(block, "name"):
                    tools.append(block.name)
            return {"type": "assistant", "texts": texts, "tools": tools}
        return {"type": type(message).__name__}

10. Ejemplo Completo: Chat en Tiempo Real

Este ejemplo combina todo lo aprendido en un sistema de chat completo con backend y frontend.

Backend FastAPI (Python)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
from dataclasses import dataclass, field
from typing import Optional
import json
import asyncio
import uuid
from datetime import datetime

app = FastAPI(title="Claude Chat en Tiempo Real")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)


@dataclass
class ChatMessage:
    id: str
    role: str  # "user" | "assistant"
    content: str
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    tool_calls: list[str] = field(default_factory=list)


@dataclass
class ChatSession:
    id: str
    cwd: str
    messages: list[ChatMessage] = field(default_factory=list)
    connections: set[WebSocket] = field(default_factory=set)
    current_agent_task: Optional[asyncio.Task] = None

    def add_connection(self, ws: WebSocket):
        self.connections.add(ws)

    def remove_connection(self, ws: WebSocket):
        self.connections.discard(ws)

    async def broadcast(self, event: dict):
        dead = set()
        for ws in self.connections.copy():
            try:
                await ws.send_text(json.dumps(event))
            except Exception:
                dead.add(ws)
        self.connections -= dead

    async def run_agent(self, user_message: str):
        user_msg = ChatMessage(
            id=str(uuid.uuid4()),
            role="user",
            content=user_message
        )
        self.messages.append(user_msg)
        await self.broadcast({"type": "user_message", "message": user_msg.__dict__})

        assistant_msg = ChatMessage(
            id=str(uuid.uuid4()),
            role="assistant",
            content=""
        )

        await self.broadcast({"type": "assistant_start", "message_id": assistant_msg.id})

        try:
            async for message in query(
                prompt=user_message,
                options=ClaudeCodeOptions(cwd=self.cwd)
            ):
                if isinstance(message, AssistantMessage):
                    for block in message.content:
                        if hasattr(block, "text"):
                            assistant_msg.content += block.text
                            await self.broadcast({
                                "type": "text_delta",
                                "message_id": assistant_msg.id,
                                "delta": block.text
                            })
                        elif hasattr(block, "name"):
                            assistant_msg.tool_calls.append(block.name)
                            await self.broadcast({
                                "type": "tool_use",
                                "message_id": assistant_msg.id,
                                "tool": block.name
                            })

                elif isinstance(message, ResultMessage):
                    await self.broadcast({
                        "type": "assistant_done",
                        "message_id": assistant_msg.id,
                        "duration_ms": message.duration_ms,
                        "cost_usd": message.cost_usd,
                    })

        except asyncio.CancelledError:
            assistant_msg.content += "\n[Cancelado]"
            await self.broadcast({"type": "assistant_cancelled", "message_id": assistant_msg.id})

        self.messages.append(assistant_msg)


sessions: dict[str, ChatSession] = {}


@app.post("/api/sessions")
async def create_session(cwd: str = "/tmp"):
    session_id = str(uuid.uuid4())
    sessions[session_id] = ChatSession(id=session_id, cwd=cwd)
    return {"session_id": session_id}


@app.get("/api/sessions/{session_id}/history")
async def get_history(session_id: str):
    if session_id not in sessions:
        raise HTTPException(status_code=404, detail="Sesión no encontrada")
    return {"messages": [m.__dict__ for m in sessions[session_id].messages]}


@app.websocket("/ws/{session_id}")
async def websocket_chat(websocket: WebSocket, session_id: str):
    if session_id not in sessions:
        await websocket.close(code=4004, reason="Sesión no encontrada")
        return

    session = sessions[session_id]
    await websocket.accept()
    session.add_connection(websocket)

    # Enviar historial
    for msg in session.messages:
        await websocket.send_text(json.dumps({"type": "history", "message": msg.__dict__}))

    try:
        while True:
            data = await websocket.receive_text()
            event = json.loads(data)

            if event["type"] == "send_message":
                if session.current_agent_task and not session.current_agent_task.done():
                    session.current_agent_task.cancel()

                session.current_agent_task = asyncio.create_task(
                    session.run_agent(event["content"])
                )

            elif event["type"] == "cancel":
                if session.current_agent_task:
                    session.current_agent_task.cancel()

    except WebSocketDisconnect:
        session.remove_connection(websocket)

Frontend completo HTML/JS

<!DOCTYPE html>
<html lang="es">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Claude Chat en Tiempo Real</title>
    <style>
        * { box-sizing: border-box; margin: 0; padding: 0; }
        body { font-family: -apple-system, sans-serif; display: flex;
               flex-direction: column; height: 100vh; background: #0f0f0f; color: #e0e0e0; }
        #header { background: #1a1a1a; padding: 1rem; border-bottom: 1px solid #333;
                  display: flex; justify-content: space-between; align-items: center; }
        #messages { flex: 1; overflow-y: auto; padding: 1rem; display: flex; flex-direction: column; gap: 1rem; }
        .message { max-width: 80%; padding: 0.75rem 1rem; border-radius: 12px; }
        .message.user { align-self: flex-end; background: #2563eb; }
        .message.assistant { align-self: flex-start; background: #1e1e1e;
                              border: 1px solid #333; white-space: pre-wrap; }
        .tool-indicator { font-size: 0.75rem; color: #4ec9b0; margin-top: 0.5rem; }
        .typing-indicator { display: flex; gap: 4px; padding: 0.5rem; }
        .typing-dot { width: 8px; height: 8px; background: #4ec9b0; border-radius: 50%;
                      animation: typing 1s infinite; }
        .typing-dot:nth-child(2) { animation-delay: 0.2s; }
        .typing-dot:nth-child(3) { animation-delay: 0.4s; }
        @keyframes typing { 0%, 60%, 100% { opacity: 0.3; } 30% { opacity: 1; } }
        #input-area { background: #1a1a1a; padding: 1rem; border-top: 1px solid #333;
                      display: flex; gap: 0.5rem; }
        #input { flex: 1; background: #2a2a2a; border: 1px solid #444; color: #e0e0e0;
                 border-radius: 8px; padding: 0.75rem; font-size: 1rem; resize: none; }
        button { background: #2563eb; color: white; border: none; padding: 0.75rem 1.5rem;
                 border-radius: 8px; cursor: pointer; font-size: 1rem; }
        button:disabled { opacity: 0.5; cursor: not-allowed; }
        #cancel-btn { background: #dc2626; }
        #status { font-size: 0.8rem; color: #666; }
    </style>
</head>
<body>
    <div id="header">
        <h2>Claude Agent Chat</h2>
        <span id="status">Desconectado</span>
    </div>
    <div id="messages"></div>
    <div id="input-area">
        <textarea id="input" rows="2" placeholder="Escribe tu mensaje..."></textarea>
        <button id="send-btn" onclick="sendMessage()">Enviar</button>
        <button id="cancel-btn" onclick="cancelAgent()" disabled>Cancelar</button>
    </div>

    <script>
        let ws = null;
        let sessionId = null;
        let currentAssistantEl = null;
        const messagesEl = document.getElementById("messages");
        const statusEl = document.getElementById("status");
        const sendBtn = document.getElementById("send-btn");
        const cancelBtn = document.getElementById("cancel-btn");

        async function init() {
            const resp = await fetch("/api/sessions", { method: "POST" });
            const data = await resp.json();
            sessionId = data.session_id;
            connectWS();
        }

        function connectWS() {
            ws = new WebSocket(`ws://localhost:8000/ws/${sessionId}`);

            ws.onopen = () => {
                statusEl.textContent = "Conectado";
                statusEl.style.color = "#4ec9b0";
            };

            ws.onclose = () => {
                statusEl.textContent = "Desconectado - reconectando...";
                statusEl.style.color = "#f44747";
                setTimeout(connectWS, 2000);
            };

            ws.onmessage = (event) => {
                const data = JSON.parse(event.data);
                handleWSEvent(data);
            };
        }

        function handleWSEvent(event) {
            switch (event.type) {
                case "user_message":
                    addMessage("user", event.message.content);
                    break;

                case "assistant_start":
                    currentAssistantEl = addMessage("assistant", "");
                    sendBtn.disabled = true;
                    cancelBtn.disabled = false;
                    showTypingIndicator();
                    break;

                case "text_delta":
                    if (currentAssistantEl) {
                        hideTypingIndicator();
                        const textEl = currentAssistantEl.querySelector(".text-content");
                        textEl.textContent += event.delta;
                        messagesEl.scrollTop = messagesEl.scrollHeight;
                    }
                    break;

                case "tool_use":
                    if (currentAssistantEl) {
                        const toolEl = currentAssistantEl.querySelector(".tool-indicator") ||
                                       createToolIndicator(currentAssistantEl);
                        toolEl.textContent += `[${event.tool}] `;
                    }
                    break;

                case "assistant_done":
                case "assistant_cancelled":
                    sendBtn.disabled = false;
                    cancelBtn.disabled = true;
                    currentAssistantEl = null;
                    break;

                case "history":
                    addMessage(event.message.role, event.message.content);
                    break;
            }
        }

        function addMessage(role, content) {
            const el = document.createElement("div");
            el.className = `message ${role}`;
            el.innerHTML = `
                <div class="text-content">${content}</div>
                ${role === "assistant" ? '<div class="tool-indicator"></div>' : ""}
            `;
            messagesEl.appendChild(el);
            messagesEl.scrollTop = messagesEl.scrollHeight;
            return el;
        }

        let typingEl = null;
        function showTypingIndicator() {
            typingEl = document.createElement("div");
            typingEl.className = "message assistant";
            typingEl.innerHTML = `
                <div class="typing-indicator">
                    <div class="typing-dot"></div>
                    <div class="typing-dot"></div>
                    <div class="typing-dot"></div>
                </div>
            `;
            messagesEl.appendChild(typingEl);
        }

        function hideTypingIndicator() {
            if (typingEl) {
                typingEl.remove();
                typingEl = null;
            }
        }

        function createToolIndicator(parent) {
            const el = parent.querySelector(".tool-indicator");
            return el;
        }

        function sendMessage() {
            const input = document.getElementById("input");
            const content = input.value.trim();
            if (!content || !ws) return;

            ws.send(JSON.stringify({ type: "send_message", content }));
            input.value = "";
        }

        function cancelAgent() {
            if (ws) ws.send(JSON.stringify({ type: "cancel" }));
        }

        document.getElementById("input").addEventListener("keydown", (e) => {
            if (e.key === "Enter" && !e.shiftKey) {
                e.preventDefault();
                sendMessage();
            }
        });

        init();
    </script>
</body>
</html>

Este ejemplo completo implementa un chat en tiempo real con:


Resumen del Capítulo

mindmap
  root((Streaming))
    Por qué
      Latencia percibida
      Experiencia usuario
      Feedback inmediato
    SDK Nativo
      AsyncGenerator
      AssistantMessage
      ResultMessage
    Terminal
      Rich library
      Spinners
      Progress bars
    Web
      SSE
        FastAPI
        Express
        Cliente HTML
      WebSocket
        Bidireccional
        Múltiples clientes
        Reconexión
    Avanzado
      Cancelación
      Backpressure
      File logging
      Progress tracking

En el próximo capítulo aprenderás a testear estos agentes de streaming de forma robusta y sin gastar en llamadas API reales.