CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-langfuse--tracing

Langfuse instrumentation methods based on OpenTelemetry

Overview
Eval results
Files

active-observations.mddocs/

Active Observations

Active observations with startActiveObservation() provide automatic lifecycle management for function-scoped operations. This approach handles observation creation, context activation, and cleanup automatically, making it ideal for wrapping asynchronous operations and ensuring proper error handling.

Core Function

startActiveObservation

Starts an observation and executes a function within its context with automatic lifecycle management.

/**
 * Starts an active observation and executes a function within its context.
 *
 * @param name - Descriptive name for the observation
 * @param fn - Function to execute (receives typed observation instance)
 * @param options - Configuration including observation type
 * @returns The exact return value of the executed function
 */
function startActiveObservation<F extends (observation: LangfuseObservation) => unknown>(
  name: string,
  fn: F,
  options?: StartActiveObservationOpts
): ReturnType<F>;

// Type-specific overloads
function startActiveObservation<F extends (generation: LangfuseGeneration) => unknown>(
  name: string,
  fn: F,
  options: { asType: "generation" }
): ReturnType<F>;

function startActiveObservation<F extends (agent: LangfuseAgent) => unknown>(
  name: string,
  fn: F,
  options: { asType: "agent" }
): ReturnType<F>;

// ... additional overloads for tool, chain, retriever, evaluator, guardrail, embedding

interface StartActiveObservationOpts {
  /** Type of observation to create. Defaults to 'span' */
  asType?: LangfuseObservationType;
  /** Custom start time for the observation */
  startTime?: Date;
  /** Parent span context to attach this observation to */
  parentSpanContext?: SpanContext;
  /** Whether to automatically end the observation when exiting. Default is true */
  endOnExit?: boolean;
}

Key Features

Automatic Context Management

The observation is automatically set as the active span in the OpenTelemetry context, making it the parent for any child observations created within the function.

import { startActiveObservation } from '@langfuse/tracing';

await startActiveObservation('parent-operation', async (observation) => {
  observation.update({ input: { step: 'start' } });

  // This child observation automatically inherits the parent context
  await startActiveObservation('child-operation', async (child) => {
    child.update({ input: { substep: 'processing' } });
    // Process...
    child.update({ output: { result: 'done' } });
  });

  observation.update({ output: { status: 'complete' } });
});
// Parent and child are automatically ended

Lifecycle Automation

Observations are automatically ended when the function completes, whether it succeeds or throws an error.

// Automatic ending on success
const result = await startActiveObservation(
  'successful-operation',
  async (observation) => {
    observation.update({ input: { data: 'value' } });
    const result = await processData();
    observation.update({ output: result });
    return result;
  }
);
// Observation automatically ended with success status

// Automatic ending on error
try {
  await startActiveObservation('failing-operation', async (observation) => {
    observation.update({ input: { data: 'value' } });
    throw new Error('Operation failed');
  });
} catch (error) {
  // Observation automatically ended with error status
}

Type Safety

The function parameter is strongly typed based on the asType option, providing full IntelliSense support.

// TypeScript knows 'generation' is LangfuseGeneration
await startActiveObservation(
  'llm-call',
  async (generation) => {
    // generation.update accepts LangfuseGenerationAttributes
    generation.update({
      model: 'gpt-4',
      modelParameters: { temperature: 0.7 },
      usageDetails: { totalTokens: 500 }
    });
  },
  { asType: 'generation' }
);

Synchronous Functions

Active observations work seamlessly with both sync and async functions.

// Synchronous function
const result = startActiveObservation('sync-calculation', (observation) => {
  observation.update({ input: { x: 10, y: 20 } });

  const result = 10 + 20;

  observation.update({ output: { result } });
  return result;
});

console.log(result); // 30

Asynchronous Functions

The return type preserves the Promise wrapper for async functions.

// Asynchronous function
const result = await startActiveObservation(
  'async-operation',
  async (observation) => {
    observation.update({ input: { userId: '123' } });

    // Await internal operations
    const data = await fetchUserData('123');
    const processed = await processData(data);

    observation.update({ output: { processed } });
    return processed;
  }
);

console.log(result); // Processed data

Error Handling

Errors are automatically captured and the observation is marked with error status.

import { startActiveObservation } from '@langfuse/tracing';

try {
  await startActiveObservation(
    'risky-operation',
    async (observation) => {
      observation.update({
        input: { operation: 'process-payment' }
      });

      try {
        const result = await processPayment();
        observation.update({ output: result });
        return result;
      } catch (error) {
        // Manually capture error details
        observation.update({
          level: 'ERROR',
          statusMessage: error.message,
          output: { error: error.message, code: error.code }
        });
        throw error; // Re-throw to propagate
      }
    }
  );
} catch (error) {
  // Error was captured in observation and is now available here
  console.error('Operation failed:', error);
}

Nested Operations

Child observations created within the function automatically inherit the context.

// RAG Chain with nested operations
const answer = await startActiveObservation(
  'rag-qa-chain',
  async (chain) => {
    chain.update({
      input: { question: 'How does photosynthesis work?' },
      metadata: { vectorDb: 'pinecone', model: 'gpt-4' }
    });

    // Retrieval step - inherits chain context
    const docs = await startActiveObservation(
      'vector-retrieval',
      async (retriever) => {
        retriever.update({
          input: { query: 'photosynthesis mechanism', topK: 5 }
        });

        const results = await vectorSearch('photosynthesis mechanism');

        retriever.update({
          output: { documents: results, count: results.length }
        });

        return results;
      },
      { asType: 'retriever' }
    );

    // Generation step - also inherits chain context
    const response = await startActiveObservation(
      'answer-generation',
      async (generation) => {
        const context = docs.map(d => d.content).join('\n');

        generation.update({
          input: { question: 'How does photosynthesis work?', context },
          model: 'gpt-4'
        });

        const answer = await generateAnswer(context);

        generation.update({
          output: { answer },
          usageDetails: { totalTokens: 450 }
        });

        return answer;
      },
      { asType: 'generation' }
    );

    chain.update({
      output: { answer: response, sources: docs.length }
    });

    return response;
  },
  { asType: 'chain' }
);

Examples by Observation Type

Span

Default observation type for general operations.

const result = await startActiveObservation(
  'data-processing',
  async (span) => {
    span.update({
      input: { records: 1000 },
      metadata: { version: '2.1.0' }
    });

    const processed = await processRecords(records);

    span.update({
      output: { processedCount: processed.length },
      metadata: { duration: Date.now() - startTime }
    });

    return processed;
  }
);

Generation

For LLM calls and AI model interactions.

const response = await startActiveObservation(
  'openai-completion',
  async (generation) => {
    generation.update({
      input: [
        { role: 'system', content: 'You are a helpful assistant' },
        { role: 'user', content: 'Explain AI ethics' }
      ],
      model: 'gpt-4-turbo',
      modelParameters: { temperature: 0.7, maxTokens: 500 }
    });

    const result = await openai.chat.completions.create({
      model: 'gpt-4-turbo',
      messages: [
        { role: 'system', content: 'You are a helpful assistant' },
        { role: 'user', content: 'Explain AI ethics' }
      ],
      temperature: 0.7,
      max_tokens: 500
    });

    generation.update({
      output: result.choices[0].message,
      usageDetails: {
        promptTokens: result.usage.prompt_tokens,
        completionTokens: result.usage.completion_tokens,
        totalTokens: result.usage.total_tokens
      },
      costDetails: { totalCost: 0.002, currency: 'USD' }
    });

    return result.choices[0].message.content;
  },
  { asType: 'generation' }
);

Agent

For AI agent workflows with tool usage.

const agentResult = await startActiveObservation(
  'research-agent',
  async (agent) => {
    agent.update({
      input: { query: 'Latest climate change research' },
      metadata: { tools: ['web-search', 'arxiv-search'], model: 'gpt-4' }
    });

    // Tool calls inherit the agent context automatically
    const webResults = await startActiveObservation(
      'web-search-tool',
      async (tool) => {
        tool.update({ input: { query: 'climate change 2024' } });
        const results = await searchWeb('climate change 2024');
        tool.update({ output: results });
        return results;
      },
      { asType: 'tool' }
    );

    const analysis = await analyzeResults(webResults);

    agent.update({
      output: { analysis, sources: webResults.length },
      metadata: { processingTime: Date.now() }
    });

    return analysis;
  },
  { asType: 'agent' }
);

Tool

For individual tool calls and API interactions.

const searchResults = await startActiveObservation(
  'web-search',
  async (tool) => {
    tool.update({
      input: { query: 'latest AI news', maxResults: 10 },
      metadata: { provider: 'google-api' }
    });

    const results = await performWebSearch('latest AI news', 10);

    tool.update({
      output: {
        results: results,
        count: results.length,
        relevanceScore: 0.89
      },
      metadata: {
        latency: 1200,
        cacheHit: false
      }
    });

    return results;
  },
  { asType: 'tool' }
);

Retriever

For document retrieval and search operations.

const documents = await startActiveObservation(
  'vector-search',
  async (retriever) => {
    retriever.update({
      input: {
        query: 'machine learning algorithms',
        topK: 5,
        threshold: 0.7
      },
      metadata: {
        vectorStore: 'pinecone',
        embeddingModel: 'text-embedding-ada-002'
      }
    });

    const results = await vectorDB.search({
      query: 'machine learning algorithms',
      topK: 5
    });

    retriever.update({
      output: {
        documents: results,
        count: results.length,
        avgSimilarity: 0.85
      },
      metadata: { searchLatency: 120 }
    });

    return results;
  },
  { asType: 'retriever' }
);

Evaluator

For quality assessment and evaluation.

const evaluation = await startActiveObservation(
  'response-evaluator',
  async (evaluator) => {
    evaluator.update({
      input: {
        response: 'Paris is the capital of France.',
        reference: 'The capital city of France is Paris.',
        criteria: ['accuracy', 'clarity']
      },
      metadata: { metric: 'semantic-similarity' }
    });

    const score = await calculateSimilarity(response, reference);
    const passed = score > 0.8;

    evaluator.update({
      output: {
        score,
        passed,
        grade: passed ? 'excellent' : 'needs_improvement'
      }
    });

    return { score, passed };
  },
  { asType: 'evaluator' }
);

Guardrail

For safety checks and content filtering.

const safetyCheck = await startActiveObservation(
  'content-guardrail',
  async (guardrail) => {
    guardrail.update({
      input: {
        text: userMessage,
        policies: ['no-profanity', 'no-pii']
      },
      metadata: { strictMode: true }
    });

    const violations = await checkContent(userMessage);
    const allowed = violations.length === 0;

    guardrail.update({
      output: { allowed, violations, confidence: 0.95 }
    });

    return { allowed, violations };
  },
  { asType: 'guardrail' }
);

Embedding

For text embedding generation.

const embeddings = await startActiveObservation(
  'text-embeddings',
  async (embedding) => {
    const texts = ['Hello world', 'Machine learning'];

    embedding.update({
      input: { texts },
      model: 'text-embedding-ada-002',
      metadata: { dimensions: 1536 }
    });

    const vectors = await generateEmbeddings(texts);

    embedding.update({
      output: { embeddings: vectors, count: vectors.length },
      usageDetails: { totalTokens: texts.join(' ').split(' ').length }
    });

    return vectors;
  },
  { asType: 'embedding' }
);

Advanced Options

Custom Start Time

Specify a custom start time for the observation.

const result = await startActiveObservation(
  'backdated-operation',
  async (observation) => {
    observation.update({ input: { data: 'value' } });
    const result = await process();
    observation.update({ output: result });
    return result;
  },
  { startTime: new Date('2024-01-01T10:00:00Z') }
);

Disable Automatic Ending

For long-running operations that need manual control.

const observation = await startActiveObservation(
  'background-process',
  async (span) => {
    span.update({ input: { taskId: '123' } });

    // Start background task that continues after function returns
    startBackgroundTask(span);

    return 'process-started';
  },
  { asType: 'span', endOnExit: false }
);

// Later, manually end the observation
// observation.end(); // Note: You need to keep a reference to the observation

Parent Context

Link to a specific parent observation.

const parentSpan = startObservation('parent');
const parentContext = parentSpan.otelSpan.spanContext();

await startActiveObservation(
  'child-with-custom-parent',
  async (child) => {
    child.update({ input: { step: 'processing' } });
    // Process...
    child.update({ output: { result: 'done' } });
  },
  { parentSpanContext: parentContext }
);

parentSpan.end();

Context Propagation

Active observations automatically propagate context through the call stack.

async function levelOne() {
  return await startActiveObservation('level-1', async (obs1) => {
    obs1.update({ input: { level: 1 } });

    const result = await levelTwo();

    obs1.update({ output: { result } });
    return result;
  });
}

async function levelTwo() {
  // Automatically becomes child of level-1
  return await startActiveObservation('level-2', async (obs2) => {
    obs2.update({ input: { level: 2 } });

    const result = await levelThree();

    obs2.update({ output: { result } });
    return result;
  });
}

async function levelThree() {
  // Automatically becomes child of level-2
  return await startActiveObservation('level-3', async (obs3) => {
    obs3.update({ input: { level: 3 } });

    const result = await doWork();

    obs3.update({ output: { result } });
    return result;
  });
}

// Creates a 3-level hierarchy automatically
await levelOne();

Best Practices

Use for Function-Scoped Operations

Active observations are ideal when the observation lifecycle matches a function's execution.

// Good: Observation scope matches function scope
async function processOrder(orderId: string) {
  return await startActiveObservation(
    'process-order',
    async (observation) => {
      observation.update({ input: { orderId } });
      // All processing happens within the observation
      const result = await performProcessing(orderId);
      observation.update({ output: result });
      return result;
    }
  );
}

Combine with Manual Observations

Mix manual and active observations based on needs.

// Manual parent for long-lived workflow
const workflow = startObservation('long-workflow');

// Active child for scoped operation
const stepResult = await startActiveObservation(
  'workflow-step',
  async (step) => {
    step.update({ input: { data: 'value' } });
    const result = await processStep();
    step.update({ output: result });
    return result;
  },
  { parentSpanContext: workflow.otelSpan.spanContext() }
);

// Continue with more work...
workflow.update({ output: { stepResult } });
workflow.end();

Handle Errors Gracefully

Add try-catch blocks for custom error handling.

await startActiveObservation(
  'operation-with-recovery',
  async (observation) => {
    observation.update({ input: { attempt: 1 } });

    try {
      const result = await riskyOperation();
      observation.update({ output: result });
      return result;
    } catch (error) {
      observation.update({
        level: 'ERROR',
        statusMessage: error.message,
        output: { error: error.message }
      });

      // Try recovery
      const fallback = await fallbackOperation();
      observation.update({
        level: 'WARNING',
        output: { fallback, recovered: true }
      });

      return fallback;
    }
  }
);

Return Values

Always return values from the callback function to preserve function behavior.

// Good: Return value preserved
const result = await startActiveObservation(
  'calculation',
  async (obs) => {
    const value = await compute();
    obs.update({ output: value });
    return value; // Return the value
  }
);

console.log(result); // Access the computed value

// Avoid: Missing return
await startActiveObservation('calculation', async (obs) => {
  const value = await compute();
  obs.update({ output: value });
  // Missing return - caller won't get the value
});

Install with Tessl CLI

npx tessl i tessl/npm-langfuse--tracing

docs

active-observations.md

attribute-creation.md

context-management.md

index.md

manual-observations.md

observation-types.md

observe-decorator.md

otel-span-attributes.md

trace-id-generation.md

tracer-provider.md

tile.json