CtrlK
BlogDocsLog inGet started
Tessl Logo

langchain-webhooks-events

Implement LangChain callback handlers, streaming, webhooks, Server-Sent Events (SSE), and WebSocket integration. Trigger: "langchain callbacks", "langchain webhooks", "langchain events", "langchain streaming", "langchain SSE", "WebSocket LLM".

84

Quality

82%

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Advisory

Suggest reviewing before use

SKILL.md
Quality
Evals
Security

LangChain Webhooks & Events

Overview

Event-driven patterns for LangChain: custom callback handlers for lifecycle hooks, webhook dispatching, Server-Sent Events (SSE) for streaming, WebSocket integration, and event aggregation for tracing.

Callback Handler Architecture

LangChain emits events at every stage of chain/agent execution. Custom handlers can observe, log, stream, or dispatch these events.

chain.invoke()
  ├── handleChainStart()
  │   ├── handleLLMStart()
  │   │   ├── handleLLMNewToken()  // streaming
  │   │   └── handleLLMEnd()
  │   ├── handleToolStart()
  │   │   └── handleToolEnd()
  │   └── handleRetrieverStart()
  │       └── handleRetrieverEnd()
  └── handleChainEnd()

Custom Callback Handler

import { BaseCallbackHandler } from "@langchain/core/callbacks/base";

class WebhookHandler extends BaseCallbackHandler {
  name = "WebhookHandler";

  constructor(private webhookUrl: string) {
    super();
  }

  async handleLLMStart(llm: any, prompts: string[]) {
    await this.send("llm_start", {
      model: llm?.id?.[2],
      promptCount: prompts.length,
    });
  }

  async handleLLMEnd(output: any) {
    await this.send("llm_end", {
      tokenUsage: output.llmOutput?.tokenUsage,
    });
  }

  async handleLLMError(error: Error) {
    await this.send("llm_error", {
      error: error.message,
    });
  }

  async handleToolStart(_tool: any, input: string) {
    await this.send("tool_start", { input: input.slice(0, 200) });
  }

  async handleToolEnd(output: string) {
    await this.send("tool_end", { output: output.slice(0, 200) });
  }

  private async send(event: string, data: Record<string, any>) {
    try {
      await fetch(this.webhookUrl, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          event,
          data,
          timestamp: new Date().toISOString(),
        }),
        signal: AbortSignal.timeout(5000),
      });
    } catch (e) {
      // Don't let webhook failures break the chain
      console.warn(`Webhook error: ${e}`);
    }
  }
}

// Attach to model
const model = new ChatOpenAI({
  model: "gpt-4o-mini",
  callbacks: [new WebhookHandler("https://api.example.com/webhook")],
});

Server-Sent Events (SSE) Endpoint

import express from "express";
import { ChatOpenAI } from "@langchain/openai";
import { ChatPromptTemplate } from "@langchain/core/prompts";
import { StringOutputParser } from "@langchain/core/output_parsers";

const app = express();
app.use(express.json());

const chain = ChatPromptTemplate.fromTemplate("{input}")
  .pipe(new ChatOpenAI({ model: "gpt-4o-mini", streaming: true }))
  .pipe(new StringOutputParser());

app.post("/api/chat/stream", async (req, res) => {
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  try {
    const stream = await chain.stream({ input: req.body.input });

    for await (const chunk of stream) {
      if (res.destroyed) break;  // client disconnected
      res.write(`data: ${JSON.stringify({ text: chunk })}\n\n`);
    }

    res.write(`data: ${JSON.stringify({ done: true })}\n\n`);
  } catch (error: any) {
    res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
  }

  res.end();
});

Client-Side SSE Consumer

// Browser JavaScript
async function streamChat(input: string) {
  const response = await fetch("/api/chat/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ input }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    const lines = text.split("\n").filter((l) => l.startsWith("data: "));

    for (const line of lines) {
      const data = JSON.parse(line.slice(6));
      if (data.done) return;
      if (data.text) document.getElementById("output")!.textContent += data.text;
    }
  }
}

WebSocket Streaming

import { WebSocketServer } from "ws";
import { ChatOpenAI } from "@langchain/openai";
import { ChatPromptTemplate } from "@langchain/core/prompts";
import { StringOutputParser } from "@langchain/core/output_parsers";

const wss = new WebSocketServer({ port: 8080 });

const chain = ChatPromptTemplate.fromTemplate("{input}")
  .pipe(new ChatOpenAI({ model: "gpt-4o-mini", streaming: true }))
  .pipe(new StringOutputParser());

wss.on("connection", (ws) => {
  ws.on("message", async (raw) => {
    const { input } = JSON.parse(raw.toString());

    try {
      const stream = await chain.stream({ input });
      for await (const chunk of stream) {
        if (ws.readyState !== ws.OPEN) break;
        ws.send(JSON.stringify({ type: "token", text: chunk }));
      }
      ws.send(JSON.stringify({ type: "done" }));
    } catch (error: any) {
      ws.send(JSON.stringify({ type: "error", message: error.message }));
    }
  });
});

Event Aggregation (Trace Collection)

interface TraceEvent {
  event: string;
  timestamp: number;
  data: Record<string, any>;
  runId: string;
}

class TraceAggregator extends BaseCallbackHandler {
  name = "TraceAggregator";
  events: TraceEvent[] = [];

  handleChainStart(_chain: any, inputs: any, runId: string) {
    this.log("chain_start", runId, { inputKeys: Object.keys(inputs) });
  }

  handleChainEnd(outputs: any, runId: string) {
    this.log("chain_end", runId, { outputKeys: Object.keys(outputs) });
  }

  handleLLMStart(llm: any, _prompts: string[], runId: string) {
    this.log("llm_start", runId, { model: llm?.id?.[2] });
  }

  handleLLMEnd(output: any, runId: string) {
    this.log("llm_end", runId, {
      tokens: output.llmOutput?.tokenUsage?.totalTokens,
    });
  }

  private log(event: string, runId: string, data: Record<string, any>) {
    this.events.push({ event, timestamp: Date.now(), data, runId });
  }

  getTrace() {
    return {
      events: this.events,
      totalEvents: this.events.length,
      durationMs: this.events.length > 1
        ? this.events[this.events.length - 1].timestamp - this.events[0].timestamp
        : 0,
    };
  }
}

// Usage
const tracer = new TraceAggregator();
await chain.invoke({ input: "test" }, { callbacks: [tracer] });
console.log(tracer.getTrace());

Error Handling

ErrorCauseFix
Webhook timeoutSlow endpointUse AbortSignal.timeout(), make async
WebSocket disconnectClient closedCheck ws.readyState before sending
SSE connection resetProxy timeoutAdd keep-alive pings every 15s
Events not capturedCallback not passedAdd to callbacks array in invoke()

Resources

  • LangChain Callbacks
  • LangChain Streaming
  • Server-Sent Events (MDN)
  • ws Package

Next Steps

Use langchain-observability for comprehensive production monitoring.

Repository
jeremylongshore/claude-code-plugins-plus-skills
Last updated
Created

Is this your skill?

If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.