Langfuse OpenTelemetry integration enabling automatic export of OpenTelemetry spans to the Langfuse observability platform with support for data masking, span filtering, and media content handling.
npx @tessl/cli install tessl/npm-langfuse--otel@4.2.0Langfuse OpenTelemetry Integration (@langfuse/otel) provides an OpenTelemetry span processor that exports spans to the Langfuse observability platform. It enables automatic capture and forwarding of OpenTelemetry traces from AI applications to Langfuse with built-in support for data masking, span filtering, and media content handling.
npm install @langfuse/otelimport { LangfuseSpanProcessor } from '@langfuse/otel';
import type {
LangfuseSpanProcessorParams,
MaskFunction,
ShouldExportSpan
} from '@langfuse/otel';For CommonJS:
const { LangfuseSpanProcessor } = require('@langfuse/otel');import { NodeSDK } from '@opentelemetry/sdk-node';
import { LangfuseSpanProcessor } from '@langfuse/otel';
// Create and configure the Langfuse span processor
const sdk = new NodeSDK({
spanProcessors: [
new LangfuseSpanProcessor({
publicKey: 'pk_...',
secretKey: 'sk_...',
baseUrl: 'https://cloud.langfuse.com',
environment: 'production'
})
]
});
// Start the SDK
sdk.start();
// Your OpenTelemetry instrumented code will now automatically
// export spans to LangfuseThe package implements a single main component:
SpanProcessor implementation that integrates with OpenTelemetry's tracing pipeline. It extends standard span processing to provide:
The processor supports two export modes:
OpenTelemetry span processor for exporting spans to Langfuse with advanced features.
/**
* Creates a new LangfuseSpanProcessor instance.
*
* @param params - Configuration parameters for the processor
*/
class LangfuseSpanProcessor implements SpanProcessor {
constructor(params?: LangfuseSpanProcessorParams);
/**
* Called when a span is started. Adds environment and release attributes.
*
* @param span - The span that was started
* @param parentContext - The parent context
*/
onStart(span: Span, parentContext: any): void;
/**
* Called when a span ends. Processes the span for export to Langfuse.
* This method checks if the span should be exported, applies data masking,
* handles media content extraction and upload, and passes the span to the
* exporter.
*
* @param span - The span that ended
*/
onEnd(span: ReadableSpan): void;
/**
* Forces an immediate flush of all pending spans and media uploads.
*
* @returns Promise that resolves when all pending operations are complete
*/
forceFlush(): Promise<void>;
/**
* Gracefully shuts down the processor, ensuring all pending operations
* are completed.
*
* @returns Promise that resolves when shutdown is complete
*/
shutdown(): Promise<void>;
}Usage Example with Data Masking:
import { LangfuseSpanProcessor } from '@langfuse/otel';
const processor = new LangfuseSpanProcessor({
publicKey: 'pk_...',
secretKey: 'sk_...',
environment: 'staging',
mask: ({ data }) => {
// Mask sensitive data like passwords and API keys
if (typeof data === 'string') {
return data
.replace(/password=\w+/g, 'password=***')
.replace(/api_key=[\w-]+/g, 'api_key=***');
}
return data;
}
});Usage Example with Span Filtering:
import { LangfuseSpanProcessor } from '@langfuse/otel';
const processor = new LangfuseSpanProcessor({
publicKey: 'pk_...',
secretKey: 'sk_...',
shouldExportSpan: ({ otelSpan }) => {
// Only export spans from specific services or that meet certain criteria
return otelSpan.name.startsWith('llm-') ||
otelSpan.attributes['service.name'] === 'my-ai-service';
}
});Usage Example for Serverless:
import { LangfuseSpanProcessor } from '@langfuse/otel';
const processor = new LangfuseSpanProcessor({
publicKey: 'pk_...',
secretKey: 'sk_...',
exportMode: 'immediate', // Export spans immediately in serverless
timeout: 10 // Increase timeout for serverless cold starts
});Configuration parameters for the LangfuseSpanProcessor.
interface LangfuseSpanProcessorParams {
/**
* Custom OpenTelemetry span exporter. If not provided, a default OTLP
* exporter will be used that sends spans to the Langfuse API.
*/
exporter?: SpanExporter;
/**
* Langfuse public API key. Can also be set via LANGFUSE_PUBLIC_KEY
* environment variable.
*/
publicKey?: string;
/**
* Langfuse secret API key. Can also be set via LANGFUSE_SECRET_KEY
* environment variable.
*/
secretKey?: string;
/**
* Langfuse instance base URL. Can also be set via LANGFUSE_BASE_URL
* or LANGFUSE_BASEURL (legacy) environment variable.
* @defaultValue "https://cloud.langfuse.com"
*/
baseUrl?: string;
/**
* Number of spans to batch before flushing. Can also be set via
* LANGFUSE_FLUSH_AT environment variable. Only applies when exportMode
* is "batched".
*/
flushAt?: number;
/**
* Flush interval in seconds. Can also be set via LANGFUSE_FLUSH_INTERVAL
* environment variable. Only applies when exportMode is "batched".
*/
flushInterval?: number;
/**
* Function to mask sensitive data in spans before export. Applied to
* span attributes containing input, output, and metadata fields.
*/
mask?: MaskFunction;
/**
* Function to determine whether a span should be exported to Langfuse.
* If this function returns false, the span will not be exported.
*/
shouldExportSpan?: ShouldExportSpan;
/**
* Environment identifier for the traces. Can also be set via
* LANGFUSE_TRACING_ENVIRONMENT environment variable. This value is added
* as an attribute to all spans.
*/
environment?: string;
/**
* Release identifier for the traces. Can also be set via LANGFUSE_RELEASE
* environment variable. This value is added as an attribute to all spans.
*/
release?: string;
/**
* Request timeout in seconds. Can also be set via LANGFUSE_TIMEOUT
* environment variable.
* @defaultValue 5
*/
timeout?: number;
/**
* Additional HTTP headers to include with requests to the Langfuse API.
*/
additionalHeaders?: Record<string, string>;
/**
* Span export mode to use.
*
* - **batched**: Recommended for production environments with long-running
* processes. Spans are batched and exported in groups for optimal
* performance.
* - **immediate**: Recommended for short-lived environments such as
* serverless functions. Spans are exported immediately to prevent data
* loss when the process terminates or is frozen.
*
* @defaultValue "batched"
*/
exportMode?: "immediate" | "batched";
}Function type for masking sensitive data in spans before export.
/**
* Function type for masking sensitive data in spans before export.
*
* The mask function is applied to span attributes that may contain sensitive
* information (input, output, and metadata fields). It receives the data and
* should return a masked version of it.
*
* @param params - Object containing the data to be masked
* @param params.data - The data that should be masked (can be of any type)
* @returns The masked data (can be of any type)
*/
type MaskFunction = (params: { data: any }) => any;Usage Example:
const maskFunction: MaskFunction = ({ data }) => {
if (typeof data === 'string') {
// Mask credit card numbers
data = data.replace(/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/g, 'XXXX-XXXX-XXXX-XXXX');
// Mask email addresses
data = data.replace(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/g, '***@***.***');
} else if (typeof data === 'object' && data !== null) {
// Deep clone and mask object properties
const masked = JSON.parse(JSON.stringify(data));
if ('password' in masked) masked.password = '***';
if ('apiKey' in masked) masked.apiKey = '***';
return masked;
}
return data;
};Function type for determining whether a span should be exported to Langfuse.
/**
* Function type for determining whether a span should be exported to Langfuse.
*
* This function is called for each span before it is exported. If it returns
* false, the span will not be exported to Langfuse.
*
* @param params - Object containing the span to evaluate
* @param params.otelSpan - The OpenTelemetry span to evaluate
* @returns true if the span should be exported, false otherwise
*/
type ShouldExportSpan = (params: { otelSpan: ReadableSpan }) => boolean;Usage Example:
const shouldExportSpan: ShouldExportSpan = ({ otelSpan }) => {
// Only export spans that took longer than 100ms
const durationMs = otelSpan.duration[0] * 1000 + otelSpan.duration[1] / 1000000;
if (durationMs <= 100) return false;
// Don't export health check spans
if (otelSpan.name.includes('health-check')) return false;
// Only export spans from production environment
const env = otelSpan.attributes['deployment.environment'];
return env === 'production';
};The package can be configured using the following environment variables:
LANGFUSE_PUBLIC_KEY - Langfuse public API keyLANGFUSE_SECRET_KEY - Langfuse secret API keyLANGFUSE_BASE_URL - Langfuse instance base URL (defaults to "https://cloud.langfuse.com")LANGFUSE_BASEURL - Legacy alternative for LANGFUSE_BASE_URLLANGFUSE_FLUSH_AT - Number of spans to batch before flushing (batched mode only)LANGFUSE_FLUSH_INTERVAL - Flush interval in seconds (batched mode only)LANGFUSE_TRACING_ENVIRONMENT - Environment identifier for tracesLANGFUSE_RELEASE - Release identifier for tracesLANGFUSE_TIMEOUT - Request timeout in seconds (defaults to 5)This package requires the following peer dependencies to be installed:
@opentelemetry/api ^1.9.0@opentelemetry/core ^2.0.1@opentelemetry/exporter-trace-otlp-http >=0.202.0 <1.0.0@opentelemetry/sdk-trace-base ^2.0.1The package uses OpenTelemetry types from peer dependencies:
Span - from @opentelemetry/sdk-trace-baseReadableSpan - from @opentelemetry/sdk-trace-baseSpanProcessor - from @opentelemetry/sdk-trace-baseSpanExporter - from @opentelemetry/sdk-trace-baseThe processor handles errors gracefully:
MaskFunction throws an error, the affected attribute is fully masked with the string "<fully masked due to failed mask function>" and a warning is loggedShouldExportSpan function throws an error, the span is excluded from export and an error is logged