Langfuse instrumentation methods based on OpenTelemetry
Active observations with startActiveObservation() provide automatic lifecycle management for function-scoped operations. This approach handles observation creation, context activation, and cleanup automatically, making it ideal for wrapping asynchronous operations and ensuring proper error handling.
Starts an observation and executes a function within its context with automatic lifecycle management.
/**
* Starts an active observation and executes a function within its context.
*
* @param name - Descriptive name for the observation
* @param fn - Function to execute (receives typed observation instance)
* @param options - Configuration including observation type
* @returns The exact return value of the executed function
*/
function startActiveObservation<F extends (observation: LangfuseObservation) => unknown>(
name: string,
fn: F,
options?: StartActiveObservationOpts
): ReturnType<F>;
// Type-specific overloads
function startActiveObservation<F extends (generation: LangfuseGeneration) => unknown>(
name: string,
fn: F,
options: { asType: "generation" }
): ReturnType<F>;
function startActiveObservation<F extends (agent: LangfuseAgent) => unknown>(
name: string,
fn: F,
options: { asType: "agent" }
): ReturnType<F>;
// ... additional overloads for tool, chain, retriever, evaluator, guardrail, embedding
interface StartActiveObservationOpts {
/** Type of observation to create. Defaults to 'span' */
asType?: LangfuseObservationType;
/** Custom start time for the observation */
startTime?: Date;
/** Parent span context to attach this observation to */
parentSpanContext?: SpanContext;
/** Whether to automatically end the observation when exiting. Default is true */
endOnExit?: boolean;
}The observation is automatically set as the active span in the OpenTelemetry context, making it the parent for any child observations created within the function.
import { startActiveObservation } from '@langfuse/tracing';
await startActiveObservation('parent-operation', async (observation) => {
observation.update({ input: { step: 'start' } });
// This child observation automatically inherits the parent context
await startActiveObservation('child-operation', async (child) => {
child.update({ input: { substep: 'processing' } });
// Process...
child.update({ output: { result: 'done' } });
});
observation.update({ output: { status: 'complete' } });
});
// Parent and child are automatically endedObservations are automatically ended when the function completes, whether it succeeds or throws an error.
// Automatic ending on success
const result = await startActiveObservation(
'successful-operation',
async (observation) => {
observation.update({ input: { data: 'value' } });
const result = await processData();
observation.update({ output: result });
return result;
}
);
// Observation automatically ended with success status
// Automatic ending on error
try {
await startActiveObservation('failing-operation', async (observation) => {
observation.update({ input: { data: 'value' } });
throw new Error('Operation failed');
});
} catch (error) {
// Observation automatically ended with error status
}The function parameter is strongly typed based on the asType option, providing full IntelliSense support.
// TypeScript knows 'generation' is LangfuseGeneration
await startActiveObservation(
'llm-call',
async (generation) => {
// generation.update accepts LangfuseGenerationAttributes
generation.update({
model: 'gpt-4',
modelParameters: { temperature: 0.7 },
usageDetails: { totalTokens: 500 }
});
},
{ asType: 'generation' }
);Active observations work seamlessly with both sync and async functions.
// Synchronous function
const result = startActiveObservation('sync-calculation', (observation) => {
observation.update({ input: { x: 10, y: 20 } });
const result = 10 + 20;
observation.update({ output: { result } });
return result;
});
console.log(result); // 30The return type preserves the Promise wrapper for async functions.
// Asynchronous function
const result = await startActiveObservation(
'async-operation',
async (observation) => {
observation.update({ input: { userId: '123' } });
// Await internal operations
const data = await fetchUserData('123');
const processed = await processData(data);
observation.update({ output: { processed } });
return processed;
}
);
console.log(result); // Processed dataErrors are automatically captured and the observation is marked with error status.
import { startActiveObservation } from '@langfuse/tracing';
try {
await startActiveObservation(
'risky-operation',
async (observation) => {
observation.update({
input: { operation: 'process-payment' }
});
try {
const result = await processPayment();
observation.update({ output: result });
return result;
} catch (error) {
// Manually capture error details
observation.update({
level: 'ERROR',
statusMessage: error.message,
output: { error: error.message, code: error.code }
});
throw error; // Re-throw to propagate
}
}
);
} catch (error) {
// Error was captured in observation and is now available here
console.error('Operation failed:', error);
}Child observations created within the function automatically inherit the context.
// RAG Chain with nested operations
const answer = await startActiveObservation(
'rag-qa-chain',
async (chain) => {
chain.update({
input: { question: 'How does photosynthesis work?' },
metadata: { vectorDb: 'pinecone', model: 'gpt-4' }
});
// Retrieval step - inherits chain context
const docs = await startActiveObservation(
'vector-retrieval',
async (retriever) => {
retriever.update({
input: { query: 'photosynthesis mechanism', topK: 5 }
});
const results = await vectorSearch('photosynthesis mechanism');
retriever.update({
output: { documents: results, count: results.length }
});
return results;
},
{ asType: 'retriever' }
);
// Generation step - also inherits chain context
const response = await startActiveObservation(
'answer-generation',
async (generation) => {
const context = docs.map(d => d.content).join('\n');
generation.update({
input: { question: 'How does photosynthesis work?', context },
model: 'gpt-4'
});
const answer = await generateAnswer(context);
generation.update({
output: { answer },
usageDetails: { totalTokens: 450 }
});
return answer;
},
{ asType: 'generation' }
);
chain.update({
output: { answer: response, sources: docs.length }
});
return response;
},
{ asType: 'chain' }
);Default observation type for general operations.
const result = await startActiveObservation(
'data-processing',
async (span) => {
span.update({
input: { records: 1000 },
metadata: { version: '2.1.0' }
});
const processed = await processRecords(records);
span.update({
output: { processedCount: processed.length },
metadata: { duration: Date.now() - startTime }
});
return processed;
}
);For LLM calls and AI model interactions.
const response = await startActiveObservation(
'openai-completion',
async (generation) => {
generation.update({
input: [
{ role: 'system', content: 'You are a helpful assistant' },
{ role: 'user', content: 'Explain AI ethics' }
],
model: 'gpt-4-turbo',
modelParameters: { temperature: 0.7, maxTokens: 500 }
});
const result = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: [
{ role: 'system', content: 'You are a helpful assistant' },
{ role: 'user', content: 'Explain AI ethics' }
],
temperature: 0.7,
max_tokens: 500
});
generation.update({
output: result.choices[0].message,
usageDetails: {
promptTokens: result.usage.prompt_tokens,
completionTokens: result.usage.completion_tokens,
totalTokens: result.usage.total_tokens
},
costDetails: { totalCost: 0.002, currency: 'USD' }
});
return result.choices[0].message.content;
},
{ asType: 'generation' }
);For AI agent workflows with tool usage.
const agentResult = await startActiveObservation(
'research-agent',
async (agent) => {
agent.update({
input: { query: 'Latest climate change research' },
metadata: { tools: ['web-search', 'arxiv-search'], model: 'gpt-4' }
});
// Tool calls inherit the agent context automatically
const webResults = await startActiveObservation(
'web-search-tool',
async (tool) => {
tool.update({ input: { query: 'climate change 2024' } });
const results = await searchWeb('climate change 2024');
tool.update({ output: results });
return results;
},
{ asType: 'tool' }
);
const analysis = await analyzeResults(webResults);
agent.update({
output: { analysis, sources: webResults.length },
metadata: { processingTime: Date.now() }
});
return analysis;
},
{ asType: 'agent' }
);For individual tool calls and API interactions.
const searchResults = await startActiveObservation(
'web-search',
async (tool) => {
tool.update({
input: { query: 'latest AI news', maxResults: 10 },
metadata: { provider: 'google-api' }
});
const results = await performWebSearch('latest AI news', 10);
tool.update({
output: {
results: results,
count: results.length,
relevanceScore: 0.89
},
metadata: {
latency: 1200,
cacheHit: false
}
});
return results;
},
{ asType: 'tool' }
);For document retrieval and search operations.
const documents = await startActiveObservation(
'vector-search',
async (retriever) => {
retriever.update({
input: {
query: 'machine learning algorithms',
topK: 5,
threshold: 0.7
},
metadata: {
vectorStore: 'pinecone',
embeddingModel: 'text-embedding-ada-002'
}
});
const results = await vectorDB.search({
query: 'machine learning algorithms',
topK: 5
});
retriever.update({
output: {
documents: results,
count: results.length,
avgSimilarity: 0.85
},
metadata: { searchLatency: 120 }
});
return results;
},
{ asType: 'retriever' }
);For quality assessment and evaluation.
const evaluation = await startActiveObservation(
'response-evaluator',
async (evaluator) => {
evaluator.update({
input: {
response: 'Paris is the capital of France.',
reference: 'The capital city of France is Paris.',
criteria: ['accuracy', 'clarity']
},
metadata: { metric: 'semantic-similarity' }
});
const score = await calculateSimilarity(response, reference);
const passed = score > 0.8;
evaluator.update({
output: {
score,
passed,
grade: passed ? 'excellent' : 'needs_improvement'
}
});
return { score, passed };
},
{ asType: 'evaluator' }
);For safety checks and content filtering.
const safetyCheck = await startActiveObservation(
'content-guardrail',
async (guardrail) => {
guardrail.update({
input: {
text: userMessage,
policies: ['no-profanity', 'no-pii']
},
metadata: { strictMode: true }
});
const violations = await checkContent(userMessage);
const allowed = violations.length === 0;
guardrail.update({
output: { allowed, violations, confidence: 0.95 }
});
return { allowed, violations };
},
{ asType: 'guardrail' }
);For text embedding generation.
const embeddings = await startActiveObservation(
'text-embeddings',
async (embedding) => {
const texts = ['Hello world', 'Machine learning'];
embedding.update({
input: { texts },
model: 'text-embedding-ada-002',
metadata: { dimensions: 1536 }
});
const vectors = await generateEmbeddings(texts);
embedding.update({
output: { embeddings: vectors, count: vectors.length },
usageDetails: { totalTokens: texts.join(' ').split(' ').length }
});
return vectors;
},
{ asType: 'embedding' }
);Specify a custom start time for the observation.
const result = await startActiveObservation(
'backdated-operation',
async (observation) => {
observation.update({ input: { data: 'value' } });
const result = await process();
observation.update({ output: result });
return result;
},
{ startTime: new Date('2024-01-01T10:00:00Z') }
);For long-running operations that need manual control.
const observation = await startActiveObservation(
'background-process',
async (span) => {
span.update({ input: { taskId: '123' } });
// Start background task that continues after function returns
startBackgroundTask(span);
return 'process-started';
},
{ asType: 'span', endOnExit: false }
);
// Later, manually end the observation
// observation.end(); // Note: You need to keep a reference to the observationLink to a specific parent observation.
const parentSpan = startObservation('parent');
const parentContext = parentSpan.otelSpan.spanContext();
await startActiveObservation(
'child-with-custom-parent',
async (child) => {
child.update({ input: { step: 'processing' } });
// Process...
child.update({ output: { result: 'done' } });
},
{ parentSpanContext: parentContext }
);
parentSpan.end();Active observations automatically propagate context through the call stack.
async function levelOne() {
return await startActiveObservation('level-1', async (obs1) => {
obs1.update({ input: { level: 1 } });
const result = await levelTwo();
obs1.update({ output: { result } });
return result;
});
}
async function levelTwo() {
// Automatically becomes child of level-1
return await startActiveObservation('level-2', async (obs2) => {
obs2.update({ input: { level: 2 } });
const result = await levelThree();
obs2.update({ output: { result } });
return result;
});
}
async function levelThree() {
// Automatically becomes child of level-2
return await startActiveObservation('level-3', async (obs3) => {
obs3.update({ input: { level: 3 } });
const result = await doWork();
obs3.update({ output: { result } });
return result;
});
}
// Creates a 3-level hierarchy automatically
await levelOne();Active observations are ideal when the observation lifecycle matches a function's execution.
// Good: Observation scope matches function scope
async function processOrder(orderId: string) {
return await startActiveObservation(
'process-order',
async (observation) => {
observation.update({ input: { orderId } });
// All processing happens within the observation
const result = await performProcessing(orderId);
observation.update({ output: result });
return result;
}
);
}Mix manual and active observations based on needs.
// Manual parent for long-lived workflow
const workflow = startObservation('long-workflow');
// Active child for scoped operation
const stepResult = await startActiveObservation(
'workflow-step',
async (step) => {
step.update({ input: { data: 'value' } });
const result = await processStep();
step.update({ output: result });
return result;
},
{ parentSpanContext: workflow.otelSpan.spanContext() }
);
// Continue with more work...
workflow.update({ output: { stepResult } });
workflow.end();Add try-catch blocks for custom error handling.
await startActiveObservation(
'operation-with-recovery',
async (observation) => {
observation.update({ input: { attempt: 1 } });
try {
const result = await riskyOperation();
observation.update({ output: result });
return result;
} catch (error) {
observation.update({
level: 'ERROR',
statusMessage: error.message,
output: { error: error.message }
});
// Try recovery
const fallback = await fallbackOperation();
observation.update({
level: 'WARNING',
output: { fallback, recovered: true }
});
return fallback;
}
}
);Always return values from the callback function to preserve function behavior.
// Good: Return value preserved
const result = await startActiveObservation(
'calculation',
async (obs) => {
const value = await compute();
obs.update({ output: value });
return value; // Return the value
}
);
console.log(result); // Access the computed value
// Avoid: Missing return
await startActiveObservation('calculation', async (obs) => {
const value = await compute();
obs.update({ output: value });
// Missing return - caller won't get the value
});Install with Tessl CLI
npx tessl i tessl/npm-langfuse--tracing