or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/npm-langfuse--otel

Langfuse OpenTelemetry integration enabling automatic export of OpenTelemetry spans to the Langfuse observability platform with support for data masking, span filtering, and media content handling.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/@langfuse/otel@4.2.x

To install, run

npx @tessl/cli install tessl/npm-langfuse--otel@4.2.0

index.mddocs/

Langfuse OpenTelemetry Integration

Langfuse OpenTelemetry Integration (@langfuse/otel) provides an OpenTelemetry span processor that exports spans to the Langfuse observability platform. It enables automatic capture and forwarding of OpenTelemetry traces from AI applications to Langfuse with built-in support for data masking, span filtering, and media content handling.

Package Information

  • Package Name: @langfuse/otel
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install @langfuse/otel
  • Node Version: >=20
  • License: MIT

Core Imports

import { LangfuseSpanProcessor } from '@langfuse/otel';
import type {
  LangfuseSpanProcessorParams,
  MaskFunction,
  ShouldExportSpan
} from '@langfuse/otel';

For CommonJS:

const { LangfuseSpanProcessor } = require('@langfuse/otel');

Basic Usage

import { NodeSDK } from '@opentelemetry/sdk-node';
import { LangfuseSpanProcessor } from '@langfuse/otel';

// Create and configure the Langfuse span processor
const sdk = new NodeSDK({
  spanProcessors: [
    new LangfuseSpanProcessor({
      publicKey: 'pk_...',
      secretKey: 'sk_...',
      baseUrl: 'https://cloud.langfuse.com',
      environment: 'production'
    })
  ]
});

// Start the SDK
sdk.start();

// Your OpenTelemetry instrumented code will now automatically
// export spans to Langfuse

Architecture

The package implements a single main component:

  • LangfuseSpanProcessor: A SpanProcessor implementation that integrates with OpenTelemetry's tracing pipeline. It extends standard span processing to provide:
    • Automatic batching and flushing of spans to Langfuse
    • Media content extraction and upload from base64 data URIs
    • Data masking capabilities for sensitive information
    • Conditional span export based on custom logic
    • Environment and release tagging

The processor supports two export modes:

  • batched (default): Recommended for production environments with long-running processes. Spans are batched and exported in groups for optimal performance.
  • immediate: Recommended for short-lived environments such as serverless functions. Spans are exported immediately to prevent data loss when the process terminates.

Capabilities

Span Processor

OpenTelemetry span processor for exporting spans to Langfuse with advanced features.

/**
 * Creates a new LangfuseSpanProcessor instance.
 *
 * @param params - Configuration parameters for the processor
 */
class LangfuseSpanProcessor implements SpanProcessor {
  constructor(params?: LangfuseSpanProcessorParams);

  /**
   * Called when a span is started. Adds environment and release attributes.
   *
   * @param span - The span that was started
   * @param parentContext - The parent context
   */
  onStart(span: Span, parentContext: any): void;

  /**
   * Called when a span ends. Processes the span for export to Langfuse.
   * This method checks if the span should be exported, applies data masking,
   * handles media content extraction and upload, and passes the span to the
   * exporter.
   *
   * @param span - The span that ended
   */
  onEnd(span: ReadableSpan): void;

  /**
   * Forces an immediate flush of all pending spans and media uploads.
   *
   * @returns Promise that resolves when all pending operations are complete
   */
  forceFlush(): Promise<void>;

  /**
   * Gracefully shuts down the processor, ensuring all pending operations
   * are completed.
   *
   * @returns Promise that resolves when shutdown is complete
   */
  shutdown(): Promise<void>;
}

Usage Example with Data Masking:

import { LangfuseSpanProcessor } from '@langfuse/otel';

const processor = new LangfuseSpanProcessor({
  publicKey: 'pk_...',
  secretKey: 'sk_...',
  environment: 'staging',
  mask: ({ data }) => {
    // Mask sensitive data like passwords and API keys
    if (typeof data === 'string') {
      return data
        .replace(/password=\w+/g, 'password=***')
        .replace(/api_key=[\w-]+/g, 'api_key=***');
    }
    return data;
  }
});

Usage Example with Span Filtering:

import { LangfuseSpanProcessor } from '@langfuse/otel';

const processor = new LangfuseSpanProcessor({
  publicKey: 'pk_...',
  secretKey: 'sk_...',
  shouldExportSpan: ({ otelSpan }) => {
    // Only export spans from specific services or that meet certain criteria
    return otelSpan.name.startsWith('llm-') ||
           otelSpan.attributes['service.name'] === 'my-ai-service';
  }
});

Usage Example for Serverless:

import { LangfuseSpanProcessor } from '@langfuse/otel';

const processor = new LangfuseSpanProcessor({
  publicKey: 'pk_...',
  secretKey: 'sk_...',
  exportMode: 'immediate', // Export spans immediately in serverless
  timeout: 10 // Increase timeout for serverless cold starts
});

Configuration

Configuration parameters for the LangfuseSpanProcessor.

interface LangfuseSpanProcessorParams {
  /**
   * Custom OpenTelemetry span exporter. If not provided, a default OTLP
   * exporter will be used that sends spans to the Langfuse API.
   */
  exporter?: SpanExporter;

  /**
   * Langfuse public API key. Can also be set via LANGFUSE_PUBLIC_KEY
   * environment variable.
   */
  publicKey?: string;

  /**
   * Langfuse secret API key. Can also be set via LANGFUSE_SECRET_KEY
   * environment variable.
   */
  secretKey?: string;

  /**
   * Langfuse instance base URL. Can also be set via LANGFUSE_BASE_URL
   * or LANGFUSE_BASEURL (legacy) environment variable.
   * @defaultValue "https://cloud.langfuse.com"
   */
  baseUrl?: string;

  /**
   * Number of spans to batch before flushing. Can also be set via
   * LANGFUSE_FLUSH_AT environment variable. Only applies when exportMode
   * is "batched".
   */
  flushAt?: number;

  /**
   * Flush interval in seconds. Can also be set via LANGFUSE_FLUSH_INTERVAL
   * environment variable. Only applies when exportMode is "batched".
   */
  flushInterval?: number;

  /**
   * Function to mask sensitive data in spans before export. Applied to
   * span attributes containing input, output, and metadata fields.
   */
  mask?: MaskFunction;

  /**
   * Function to determine whether a span should be exported to Langfuse.
   * If this function returns false, the span will not be exported.
   */
  shouldExportSpan?: ShouldExportSpan;

  /**
   * Environment identifier for the traces. Can also be set via
   * LANGFUSE_TRACING_ENVIRONMENT environment variable. This value is added
   * as an attribute to all spans.
   */
  environment?: string;

  /**
   * Release identifier for the traces. Can also be set via LANGFUSE_RELEASE
   * environment variable. This value is added as an attribute to all spans.
   */
  release?: string;

  /**
   * Request timeout in seconds. Can also be set via LANGFUSE_TIMEOUT
   * environment variable.
   * @defaultValue 5
   */
  timeout?: number;

  /**
   * Additional HTTP headers to include with requests to the Langfuse API.
   */
  additionalHeaders?: Record<string, string>;

  /**
   * Span export mode to use.
   *
   * - **batched**: Recommended for production environments with long-running
   *   processes. Spans are batched and exported in groups for optimal
   *   performance.
   * - **immediate**: Recommended for short-lived environments such as
   *   serverless functions. Spans are exported immediately to prevent data
   *   loss when the process terminates or is frozen.
   *
   * @defaultValue "batched"
   */
  exportMode?: "immediate" | "batched";
}

Mask Function

Function type for masking sensitive data in spans before export.

/**
 * Function type for masking sensitive data in spans before export.
 *
 * The mask function is applied to span attributes that may contain sensitive
 * information (input, output, and metadata fields). It receives the data and
 * should return a masked version of it.
 *
 * @param params - Object containing the data to be masked
 * @param params.data - The data that should be masked (can be of any type)
 * @returns The masked data (can be of any type)
 */
type MaskFunction = (params: { data: any }) => any;

Usage Example:

const maskFunction: MaskFunction = ({ data }) => {
  if (typeof data === 'string') {
    // Mask credit card numbers
    data = data.replace(/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/g, 'XXXX-XXXX-XXXX-XXXX');
    // Mask email addresses
    data = data.replace(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/g, '***@***.***');
  } else if (typeof data === 'object' && data !== null) {
    // Deep clone and mask object properties
    const masked = JSON.parse(JSON.stringify(data));
    if ('password' in masked) masked.password = '***';
    if ('apiKey' in masked) masked.apiKey = '***';
    return masked;
  }
  return data;
};

Should Export Span Function

Function type for determining whether a span should be exported to Langfuse.

/**
 * Function type for determining whether a span should be exported to Langfuse.
 *
 * This function is called for each span before it is exported. If it returns
 * false, the span will not be exported to Langfuse.
 *
 * @param params - Object containing the span to evaluate
 * @param params.otelSpan - The OpenTelemetry span to evaluate
 * @returns true if the span should be exported, false otherwise
 */
type ShouldExportSpan = (params: { otelSpan: ReadableSpan }) => boolean;

Usage Example:

const shouldExportSpan: ShouldExportSpan = ({ otelSpan }) => {
  // Only export spans that took longer than 100ms
  const durationMs = otelSpan.duration[0] * 1000 + otelSpan.duration[1] / 1000000;
  if (durationMs <= 100) return false;

  // Don't export health check spans
  if (otelSpan.name.includes('health-check')) return false;

  // Only export spans from production environment
  const env = otelSpan.attributes['deployment.environment'];
  return env === 'production';
};

Environment Variables

The package can be configured using the following environment variables:

  • LANGFUSE_PUBLIC_KEY - Langfuse public API key
  • LANGFUSE_SECRET_KEY - Langfuse secret API key
  • LANGFUSE_BASE_URL - Langfuse instance base URL (defaults to "https://cloud.langfuse.com")
  • LANGFUSE_BASEURL - Legacy alternative for LANGFUSE_BASE_URL
  • LANGFUSE_FLUSH_AT - Number of spans to batch before flushing (batched mode only)
  • LANGFUSE_FLUSH_INTERVAL - Flush interval in seconds (batched mode only)
  • LANGFUSE_TRACING_ENVIRONMENT - Environment identifier for traces
  • LANGFUSE_RELEASE - Release identifier for traces
  • LANGFUSE_TIMEOUT - Request timeout in seconds (defaults to 5)

Peer Dependencies

This package requires the following peer dependencies to be installed:

  • @opentelemetry/api ^1.9.0
  • @opentelemetry/core ^2.0.1
  • @opentelemetry/exporter-trace-otlp-http >=0.202.0 <1.0.0
  • @opentelemetry/sdk-trace-base ^2.0.1

Types

The package uses OpenTelemetry types from peer dependencies:

  • Span - from @opentelemetry/sdk-trace-base
  • ReadableSpan - from @opentelemetry/sdk-trace-base
  • SpanProcessor - from @opentelemetry/sdk-trace-base
  • SpanExporter - from @opentelemetry/sdk-trace-base

Error Handling

The processor handles errors gracefully:

  • If a MaskFunction throws an error, the affected attribute is fully masked with the string "<fully masked due to failed mask function>" and a warning is logged
  • If a ShouldExportSpan function throws an error, the span is excluded from export and an error is logged
  • Media upload failures are logged as errors but do not prevent span export
  • Authentication failures (missing public/secret key) are logged as warnings during initialization