Observability and analytics platform for LLM applications with hierarchical tracing, prompt management, dataset operations, and OpenAI integration
Automatic tracing for OpenAI SDK calls using a proxy wrapper. The observeOpenAI function wraps your OpenAI client to automatically capture all API calls with inputs, outputs, token usage, and timing information.
Wraps an OpenAI SDK instance with automatic Langfuse tracing.
/**
* Wraps an OpenAI SDK instance with automatic Langfuse tracing
* @param sdk - The OpenAI SDK instance to wrap
* @param langfuseConfig - Optional tracing configuration
* @returns Wrapped SDK with tracing and lifecycle methods
*/
function observeOpenAI<SDKType extends object>(
sdk: SDKType,
langfuseConfig?: LangfuseConfig
): SDKType & LangfuseExtension;
interface LangfuseExtension {
/**
* Flushes all pending Langfuse events
* @returns Promise that resolves when all events are sent
*/
flushAsync(): Promise<void>;
/**
* Shuts down the Langfuse client
* @returns Promise that resolves when shutdown is complete
*/
shutdownAsync(): Promise<void>;
}Usage Example:
import OpenAI from 'openai';
import { observeOpenAI } from 'langfuse';
const client = new OpenAI({
apiKey: process.env.OPENAI_API_KEY
});
// Wrap the client with tracing
const tracedClient = observeOpenAI(client, {
traceName: 'openai-chat',
userId: 'user-123',
sessionId: 'session-456'
});
// Use normally - all calls are automatically traced
const response = await tracedClient.chat.completions.create({
messages: [
{ role: 'system', content: 'You are a helpful assistant' },
{ role: 'user', content: 'Hello!' }
],
model: 'gpt-3.5-turbo'
});
// Flush events to Langfuse
await tracedClient.flushAsync();The LangfuseConfig type supports two modes: creating new traces or nesting under existing traces.
type LangfuseConfig = (LangfuseNewTraceConfig | LangfuseWithParentConfig) & {
/** Optional name for the generation */
generationName?: string;
/** Optional prompt client for linking */
langfusePrompt?: LangfusePromptClient;
};
interface LangfuseNewTraceConfig {
/** Custom trace ID */
traceId?: string;
/** Trace name */
traceName?: string;
/** Session ID */
sessionId?: string;
/** User ID */
userId?: string;
/** Release version */
release?: string;
/** Version identifier */
version?: string;
/** Custom metadata */
metadata?: any;
/** Tags for filtering */
tags?: string[];
/** Client initialization parameters */
clientInitParams?: LangfuseInitParams;
}
interface LangfuseWithParentConfig {
/** Parent trace, span, or generation to nest under */
parent: LangfuseParent;
/** Custom metadata */
metadata?: any;
/** Version identifier */
version?: string;
/** Prompt name (deprecated, use langfusePrompt) */
promptName?: string;
/** Prompt version (deprecated, use langfusePrompt) */
promptVersion?: number;
}
type LangfuseParent =
| LangfuseTraceClient
| LangfuseSpanClient
| LangfuseGenerationClient;
interface LangfuseInitParams {
/** Langfuse public key */
publicKey?: string;
/** Langfuse secret key */
secretKey?: string;
/** Base URL for Langfuse API */
baseUrl?: string;
/** Additional Langfuse options */
[key: string]: any;
}Simple wrapper for automatic tracing of all OpenAI calls.
import OpenAI from 'openai';
import { observeOpenAI } from 'langfuse';
const client = new OpenAI();
const tracedClient = observeOpenAI(client);
// All methods are automatically traced
const completion = await tracedClient.chat.completions.create({
messages: [{ role: 'user', content: 'Hello!' }],
model: 'gpt-3.5-turbo'
});
await tracedClient.flushAsync();Provide custom trace names for better organization.
const tracedClient = observeOpenAI(client, {
traceName: 'customer-support-chat',
userId: 'user-123',
sessionId: 'session-456',
metadata: {
environment: 'production',
region: 'us-east-1'
},
tags: ['support', 'chat']
});
const response = await tracedClient.chat.completions.create({
messages: [{ role: 'user', content: 'I need help' }],
model: 'gpt-4'
});
await tracedClient.flushAsync();Override the default generation name for specific calls.
const tracedClient = observeOpenAI(client, {
traceName: 'document-processing',
generationName: 'document-summarization'
});
const summary = await tracedClient.chat.completions.create({
messages: [
{ role: 'system', content: 'Summarize the following document' },
{ role: 'user', content: documentText }
],
model: 'gpt-4'
});
await tracedClient.flushAsync();Nest OpenAI calls under existing Langfuse traces for hierarchical tracing.
import { Langfuse, observeOpenAI } from 'langfuse';
const langfuse = new Langfuse();
const client = new OpenAI();
// Create a parent trace
const trace = langfuse.trace({
name: 'rag-pipeline',
userId: 'user-123'
});
// Create a span for the retrieval step
const retrievalSpan = trace.span({
name: 'document-retrieval'
});
// ... perform retrieval ...
retrievalSpan.end({
output: { documents: retrievedDocs }
});
// Wrap OpenAI client to nest under the trace
const tracedClient = observeOpenAI(client, {
parent: trace,
generationName: 'answer-generation'
});
// OpenAI call will be nested under the trace
const response = await tracedClient.chat.completions.create({
messages: [
{ role: 'system', content: 'Answer based on the context' },
{ role: 'user', content: query }
],
model: 'gpt-4'
});
trace.update({
output: { answer: response.choices[0].message.content }
});
await langfuse.flushAsync();Nest OpenAI calls under specific spans for detailed tracing.
const trace = langfuse.trace({ name: 'multi-step-process' });
const step1Span = trace.span({ name: 'step-1' });
// Nest OpenAI call under this span
const tracedClient = observeOpenAI(client, {
parent: step1Span,
generationName: 'step-1-generation'
});
const result = await tracedClient.chat.completions.create({
messages: [{ role: 'user', content: 'Step 1 prompt' }],
model: 'gpt-3.5-turbo'
});
step1Span.end({
output: { result: result.choices[0].message.content }
});
await langfuse.flushAsync();Link prompt templates to OpenAI generations for version tracking.
import { Langfuse, observeOpenAI } from 'langfuse';
const langfuse = new Langfuse();
const client = new OpenAI();
// Fetch a prompt
const prompt = await langfuse.getPrompt('chat-template', undefined, {
type: 'chat'
});
// Compile the prompt
const messages = prompt.compile(
{ topic: 'AI', tone: 'professional' },
{ history: [] }
);
// Wrap OpenAI with prompt linking
const tracedClient = observeOpenAI(client, {
traceName: 'templated-chat',
langfusePrompt: prompt
});
// The generation will be linked to the prompt version
const response = await tracedClient.chat.completions.create({
messages: messages,
model: 'gpt-4'
});
await tracedClient.flushAsync();Automatic tracing works with streaming responses.
const tracedClient = observeOpenAI(client, {
traceName: 'streaming-chat'
});
const stream = await tracedClient.chat.completions.create({
messages: [{ role: 'user', content: 'Tell me a story' }],
model: 'gpt-4',
stream: true
});
let fullContent = '';
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
fullContent += content;
process.stdout.write(content);
}
// Streaming responses are automatically captured
await tracedClient.flushAsync();Each wrapped client can make multiple calls, all traced under the same configuration.
const tracedClient = observeOpenAI(client, {
traceName: 'multi-call-workflow',
sessionId: 'session-789'
});
// First call
const classification = await tracedClient.chat.completions.create({
messages: [
{ role: 'system', content: 'Classify the intent' },
{ role: 'user', content: userMessage }
],
model: 'gpt-3.5-turbo'
});
// Second call (both under same trace)
const response = await tracedClient.chat.completions.create({
messages: [
{ role: 'system', content: 'Generate response' },
{ role: 'user', content: userMessage }
],
model: 'gpt-4'
});
await tracedClient.flushAsync();All OpenAI SDK methods are automatically traced, not just chat completions.
const tracedClient = observeOpenAI(client, {
traceName: 'embedding-pipeline'
});
// Embeddings are automatically traced
const embeddings = await tracedClient.embeddings.create({
input: 'Text to embed',
model: 'text-embedding-ada-002'
});
// Completions are traced
const completion = await tracedClient.completions.create({
prompt: 'Once upon a time',
model: 'gpt-3.5-turbo-instruct',
max_tokens: 100
});
await tracedClient.flushAsync();Provide custom Langfuse client initialization parameters.
const tracedClient = observeOpenAI(client, {
traceName: 'custom-config',
clientInitParams: {
publicKey: 'custom-public-key',
secretKey: 'custom-secret-key',
baseUrl: 'https://custom-langfuse.com',
flushAt: 1, // Flush after every event
flushInterval: 5000
}
});
const response = await tracedClient.chat.completions.create({
messages: [{ role: 'user', content: 'Hello' }],
model: 'gpt-3.5-turbo'
});
await tracedClient.flushAsync();import OpenAI from 'openai';
import { Langfuse, observeOpenAI } from 'langfuse';
// Initialize clients
const langfuse = new Langfuse();
const openai = new OpenAI();
// Create a parent trace for the entire workflow
const trace = langfuse.trace({
name: 'rag-qa-system',
userId: 'user-456',
sessionId: 'session-123',
tags: ['production', 'qa']
});
// Step 1: Generate query embedding
const embeddingClient = observeOpenAI(openai, {
parent: trace,
generationName: 'query-embedding'
});
const queryEmbedding = await embeddingClient.embeddings.create({
input: 'What is machine learning?',
model: 'text-embedding-ada-002'
});
// Step 2: Retrieve documents (simulated)
const retrievalSpan = trace.span({
name: 'document-retrieval',
input: { query: 'What is machine learning?' }
});
const documents = await retrieveDocuments(queryEmbedding.data[0].embedding);
retrievalSpan.end({
output: { documentCount: documents.length }
});
// Step 3: Generate answer with context
const prompt = await langfuse.getPrompt('qa-with-context', undefined, {
type: 'chat'
});
const messages = prompt.compile(
{
context: documents.join('\n'),
question: 'What is machine learning?'
},
{ history: [] }
);
const answerClient = observeOpenAI(openai, {
parent: trace,
generationName: 'answer-generation',
langfusePrompt: prompt
});
const answer = await answerClient.chat.completions.create({
messages: messages,
model: 'gpt-4',
temperature: 0.7,
max_tokens: 500
});
// Update trace with final output
trace.update({
output: {
answer: answer.choices[0].message.content,
model: 'gpt-4',
promptVersion: prompt.version
}
});
// Add a quality score
trace.score({
name: 'answer-quality',
value: 0.95,
comment: 'High quality answer with proper context'
});
// Flush all events
await langfuse.flushAsync();
// Get trace URL
console.log('View trace:', trace.getTraceUrl());The wrapper preserves errors from the OpenAI SDK while still capturing trace information.
const tracedClient = observeOpenAI(client, {
traceName: 'error-handling-example'
});
try {
const response = await tracedClient.chat.completions.create({
messages: [{ role: 'user', content: 'Hello' }],
model: 'invalid-model' // This will cause an error
});
} catch (error) {
console.error('OpenAI error:', error);
// Error is captured in the trace with ERROR level
} finally {
await tracedClient.flushAsync();
}The observeOpenAI wrapper adds minimal overhead:
Best Practices:
// Good: Create one wrapper per workflow
const tracedClient = observeOpenAI(client, { traceName: 'workflow' });
await tracedClient.chat.completions.create(/* ... */);
await tracedClient.chat.completions.create(/* ... */);
await tracedClient.flushAsync();
// Avoid: Creating multiple wrappers for single calls
// This works but creates unnecessary overhead
const client1 = observeOpenAI(client, { traceName: 'call1' });
await client1.chat.completions.create(/* ... */);
await client1.flushAsync();
const client2 = observeOpenAI(client, { traceName: 'call2' });
await client2.chat.completions.create(/* ... */);
await client2.flushAsync();Install with Tessl CLI
npx tessl i tessl/npm-langfuse