CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-langfuse

Observability and analytics platform for LLM applications with hierarchical tracing, prompt management, dataset operations, and OpenAI integration

Overview
Eval results
Files

openai-integration.mddocs/

OpenAI Integration

Automatic tracing for OpenAI SDK calls using a proxy wrapper. The observeOpenAI function wraps your OpenAI client to automatically capture all API calls with inputs, outputs, token usage, and timing information.

Capabilities

ObserveOpenAI Function

Wraps an OpenAI SDK instance with automatic Langfuse tracing.

/**
 * Wraps an OpenAI SDK instance with automatic Langfuse tracing
 * @param sdk - The OpenAI SDK instance to wrap
 * @param langfuseConfig - Optional tracing configuration
 * @returns Wrapped SDK with tracing and lifecycle methods
 */
function observeOpenAI<SDKType extends object>(
  sdk: SDKType,
  langfuseConfig?: LangfuseConfig
): SDKType & LangfuseExtension;

interface LangfuseExtension {
  /**
   * Flushes all pending Langfuse events
   * @returns Promise that resolves when all events are sent
   */
  flushAsync(): Promise<void>;

  /**
   * Shuts down the Langfuse client
   * @returns Promise that resolves when shutdown is complete
   */
  shutdownAsync(): Promise<void>;
}

Usage Example:

import OpenAI from 'openai';
import { observeOpenAI } from 'langfuse';

const client = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY
});

// Wrap the client with tracing
const tracedClient = observeOpenAI(client, {
  traceName: 'openai-chat',
  userId: 'user-123',
  sessionId: 'session-456'
});

// Use normally - all calls are automatically traced
const response = await tracedClient.chat.completions.create({
  messages: [
    { role: 'system', content: 'You are a helpful assistant' },
    { role: 'user', content: 'Hello!' }
  ],
  model: 'gpt-3.5-turbo'
});

// Flush events to Langfuse
await tracedClient.flushAsync();

Configuration Options

The LangfuseConfig type supports two modes: creating new traces or nesting under existing traces.

type LangfuseConfig = (LangfuseNewTraceConfig | LangfuseWithParentConfig) & {
  /** Optional name for the generation */
  generationName?: string;
  /** Optional prompt client for linking */
  langfusePrompt?: LangfusePromptClient;
};

interface LangfuseNewTraceConfig {
  /** Custom trace ID */
  traceId?: string;
  /** Trace name */
  traceName?: string;
  /** Session ID */
  sessionId?: string;
  /** User ID */
  userId?: string;
  /** Release version */
  release?: string;
  /** Version identifier */
  version?: string;
  /** Custom metadata */
  metadata?: any;
  /** Tags for filtering */
  tags?: string[];
  /** Client initialization parameters */
  clientInitParams?: LangfuseInitParams;
}

interface LangfuseWithParentConfig {
  /** Parent trace, span, or generation to nest under */
  parent: LangfuseParent;
  /** Custom metadata */
  metadata?: any;
  /** Version identifier */
  version?: string;
  /** Prompt name (deprecated, use langfusePrompt) */
  promptName?: string;
  /** Prompt version (deprecated, use langfusePrompt) */
  promptVersion?: number;
}

type LangfuseParent =
  | LangfuseTraceClient
  | LangfuseSpanClient
  | LangfuseGenerationClient;

interface LangfuseInitParams {
  /** Langfuse public key */
  publicKey?: string;
  /** Langfuse secret key */
  secretKey?: string;
  /** Base URL for Langfuse API */
  baseUrl?: string;
  /** Additional Langfuse options */
  [key: string]: any;
}

Usage Patterns

Basic Tracing

Simple wrapper for automatic tracing of all OpenAI calls.

import OpenAI from 'openai';
import { observeOpenAI } from 'langfuse';

const client = new OpenAI();
const tracedClient = observeOpenAI(client);

// All methods are automatically traced
const completion = await tracedClient.chat.completions.create({
  messages: [{ role: 'user', content: 'Hello!' }],
  model: 'gpt-3.5-turbo'
});

await tracedClient.flushAsync();

Named Traces

Provide custom trace names for better organization.

const tracedClient = observeOpenAI(client, {
  traceName: 'customer-support-chat',
  userId: 'user-123',
  sessionId: 'session-456',
  metadata: {
    environment: 'production',
    region: 'us-east-1'
  },
  tags: ['support', 'chat']
});

const response = await tracedClient.chat.completions.create({
  messages: [{ role: 'user', content: 'I need help' }],
  model: 'gpt-4'
});

await tracedClient.flushAsync();

Custom Generation Names

Override the default generation name for specific calls.

const tracedClient = observeOpenAI(client, {
  traceName: 'document-processing',
  generationName: 'document-summarization'
});

const summary = await tracedClient.chat.completions.create({
  messages: [
    { role: 'system', content: 'Summarize the following document' },
    { role: 'user', content: documentText }
  ],
  model: 'gpt-4'
});

await tracedClient.flushAsync();

Nesting Under Existing Traces

Nest OpenAI calls under existing Langfuse traces for hierarchical tracing.

import { Langfuse, observeOpenAI } from 'langfuse';

const langfuse = new Langfuse();
const client = new OpenAI();

// Create a parent trace
const trace = langfuse.trace({
  name: 'rag-pipeline',
  userId: 'user-123'
});

// Create a span for the retrieval step
const retrievalSpan = trace.span({
  name: 'document-retrieval'
});

// ... perform retrieval ...

retrievalSpan.end({
  output: { documents: retrievedDocs }
});

// Wrap OpenAI client to nest under the trace
const tracedClient = observeOpenAI(client, {
  parent: trace,
  generationName: 'answer-generation'
});

// OpenAI call will be nested under the trace
const response = await tracedClient.chat.completions.create({
  messages: [
    { role: 'system', content: 'Answer based on the context' },
    { role: 'user', content: query }
  ],
  model: 'gpt-4'
});

trace.update({
  output: { answer: response.choices[0].message.content }
});

await langfuse.flushAsync();

Nesting Under Spans

Nest OpenAI calls under specific spans for detailed tracing.

const trace = langfuse.trace({ name: 'multi-step-process' });

const step1Span = trace.span({ name: 'step-1' });

// Nest OpenAI call under this span
const tracedClient = observeOpenAI(client, {
  parent: step1Span,
  generationName: 'step-1-generation'
});

const result = await tracedClient.chat.completions.create({
  messages: [{ role: 'user', content: 'Step 1 prompt' }],
  model: 'gpt-3.5-turbo'
});

step1Span.end({
  output: { result: result.choices[0].message.content }
});

await langfuse.flushAsync();

Linking Prompts

Link prompt templates to OpenAI generations for version tracking.

import { Langfuse, observeOpenAI } from 'langfuse';

const langfuse = new Langfuse();
const client = new OpenAI();

// Fetch a prompt
const prompt = await langfuse.getPrompt('chat-template', undefined, {
  type: 'chat'
});

// Compile the prompt
const messages = prompt.compile(
  { topic: 'AI', tone: 'professional' },
  { history: [] }
);

// Wrap OpenAI with prompt linking
const tracedClient = observeOpenAI(client, {
  traceName: 'templated-chat',
  langfusePrompt: prompt
});

// The generation will be linked to the prompt version
const response = await tracedClient.chat.completions.create({
  messages: messages,
  model: 'gpt-4'
});

await tracedClient.flushAsync();

Streaming Responses

Automatic tracing works with streaming responses.

const tracedClient = observeOpenAI(client, {
  traceName: 'streaming-chat'
});

const stream = await tracedClient.chat.completions.create({
  messages: [{ role: 'user', content: 'Tell me a story' }],
  model: 'gpt-4',
  stream: true
});

let fullContent = '';

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content || '';
  fullContent += content;
  process.stdout.write(content);
}

// Streaming responses are automatically captured
await tracedClient.flushAsync();

Multiple OpenAI Calls

Each wrapped client can make multiple calls, all traced under the same configuration.

const tracedClient = observeOpenAI(client, {
  traceName: 'multi-call-workflow',
  sessionId: 'session-789'
});

// First call
const classification = await tracedClient.chat.completions.create({
  messages: [
    { role: 'system', content: 'Classify the intent' },
    { role: 'user', content: userMessage }
  ],
  model: 'gpt-3.5-turbo'
});

// Second call (both under same trace)
const response = await tracedClient.chat.completions.create({
  messages: [
    { role: 'system', content: 'Generate response' },
    { role: 'user', content: userMessage }
  ],
  model: 'gpt-4'
});

await tracedClient.flushAsync();

Embeddings and Other Methods

All OpenAI SDK methods are automatically traced, not just chat completions.

const tracedClient = observeOpenAI(client, {
  traceName: 'embedding-pipeline'
});

// Embeddings are automatically traced
const embeddings = await tracedClient.embeddings.create({
  input: 'Text to embed',
  model: 'text-embedding-ada-002'
});

// Completions are traced
const completion = await tracedClient.completions.create({
  prompt: 'Once upon a time',
  model: 'gpt-3.5-turbo-instruct',
  max_tokens: 100
});

await tracedClient.flushAsync();

Custom Client Initialization

Provide custom Langfuse client initialization parameters.

const tracedClient = observeOpenAI(client, {
  traceName: 'custom-config',
  clientInitParams: {
    publicKey: 'custom-public-key',
    secretKey: 'custom-secret-key',
    baseUrl: 'https://custom-langfuse.com',
    flushAt: 1, // Flush after every event
    flushInterval: 5000
  }
});

const response = await tracedClient.chat.completions.create({
  messages: [{ role: 'user', content: 'Hello' }],
  model: 'gpt-3.5-turbo'
});

await tracedClient.flushAsync();

Complete OpenAI Integration Example

import OpenAI from 'openai';
import { Langfuse, observeOpenAI } from 'langfuse';

// Initialize clients
const langfuse = new Langfuse();
const openai = new OpenAI();

// Create a parent trace for the entire workflow
const trace = langfuse.trace({
  name: 'rag-qa-system',
  userId: 'user-456',
  sessionId: 'session-123',
  tags: ['production', 'qa']
});

// Step 1: Generate query embedding
const embeddingClient = observeOpenAI(openai, {
  parent: trace,
  generationName: 'query-embedding'
});

const queryEmbedding = await embeddingClient.embeddings.create({
  input: 'What is machine learning?',
  model: 'text-embedding-ada-002'
});

// Step 2: Retrieve documents (simulated)
const retrievalSpan = trace.span({
  name: 'document-retrieval',
  input: { query: 'What is machine learning?' }
});

const documents = await retrieveDocuments(queryEmbedding.data[0].embedding);

retrievalSpan.end({
  output: { documentCount: documents.length }
});

// Step 3: Generate answer with context
const prompt = await langfuse.getPrompt('qa-with-context', undefined, {
  type: 'chat'
});

const messages = prompt.compile(
  {
    context: documents.join('\n'),
    question: 'What is machine learning?'
  },
  { history: [] }
);

const answerClient = observeOpenAI(openai, {
  parent: trace,
  generationName: 'answer-generation',
  langfusePrompt: prompt
});

const answer = await answerClient.chat.completions.create({
  messages: messages,
  model: 'gpt-4',
  temperature: 0.7,
  max_tokens: 500
});

// Update trace with final output
trace.update({
  output: {
    answer: answer.choices[0].message.content,
    model: 'gpt-4',
    promptVersion: prompt.version
  }
});

// Add a quality score
trace.score({
  name: 'answer-quality',
  value: 0.95,
  comment: 'High quality answer with proper context'
});

// Flush all events
await langfuse.flushAsync();

// Get trace URL
console.log('View trace:', trace.getTraceUrl());

Error Handling

The wrapper preserves errors from the OpenAI SDK while still capturing trace information.

const tracedClient = observeOpenAI(client, {
  traceName: 'error-handling-example'
});

try {
  const response = await tracedClient.chat.completions.create({
    messages: [{ role: 'user', content: 'Hello' }],
    model: 'invalid-model' // This will cause an error
  });
} catch (error) {
  console.error('OpenAI error:', error);
  // Error is captured in the trace with ERROR level
} finally {
  await tracedClient.flushAsync();
}

Performance Considerations

The observeOpenAI wrapper adds minimal overhead:

  • Proxy Pattern: Uses JavaScript Proxy for transparent method interception
  • Async Tracing: Events are queued and sent asynchronously without blocking OpenAI calls
  • Batching: Multiple events are batched together for efficient network usage
  • Caching: Prompt caching reduces API calls to Langfuse

Best Practices:

// Good: Create one wrapper per workflow
const tracedClient = observeOpenAI(client, { traceName: 'workflow' });
await tracedClient.chat.completions.create(/* ... */);
await tracedClient.chat.completions.create(/* ... */);
await tracedClient.flushAsync();

// Avoid: Creating multiple wrappers for single calls
// This works but creates unnecessary overhead
const client1 = observeOpenAI(client, { traceName: 'call1' });
await client1.chat.completions.create(/* ... */);
await client1.flushAsync();

const client2 = observeOpenAI(client, { traceName: 'call2' });
await client2.chat.completions.create(/* ... */);
await client2.flushAsync();

Install with Tessl CLI

npx tessl i tessl/npm-langfuse

docs

configuration.md

datasets.md

index.md

media.md

openai-integration.md

prompts.md

public-api.md

tracing.md

tile.json