or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/npm-langfuse--otel

Langfuse OpenTelemetry integration enabling automatic export of OpenTelemetry spans to the Langfuse observability platform with support for data masking, span filtering, and media content handling.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/@langfuse/otel@4.2.x

To install, run

npx @tessl/cli install tessl/npm-langfuse--otel@4.2.0

0

# Langfuse OpenTelemetry Integration

1

2

Langfuse OpenTelemetry Integration (`@langfuse/otel`) provides an OpenTelemetry span processor that exports spans to the Langfuse observability platform. It enables automatic capture and forwarding of OpenTelemetry traces from AI applications to Langfuse with built-in support for data masking, span filtering, and media content handling.

3

4

## Package Information

5

6

- **Package Name**: @langfuse/otel

7

- **Package Type**: npm

8

- **Language**: TypeScript

9

- **Installation**: `npm install @langfuse/otel`

10

- **Node Version**: >=20

11

- **License**: MIT

12

13

## Core Imports

14

15

```typescript

16

import { LangfuseSpanProcessor } from '@langfuse/otel';

17

import type {

18

LangfuseSpanProcessorParams,

19

MaskFunction,

20

ShouldExportSpan

21

} from '@langfuse/otel';

22

```

23

24

For CommonJS:

25

26

```javascript

27

const { LangfuseSpanProcessor } = require('@langfuse/otel');

28

```

29

30

## Basic Usage

31

32

```typescript

33

import { NodeSDK } from '@opentelemetry/sdk-node';

34

import { LangfuseSpanProcessor } from '@langfuse/otel';

35

36

// Create and configure the Langfuse span processor

37

const sdk = new NodeSDK({

38

spanProcessors: [

39

new LangfuseSpanProcessor({

40

publicKey: 'pk_...',

41

secretKey: 'sk_...',

42

baseUrl: 'https://cloud.langfuse.com',

43

environment: 'production'

44

})

45

]

46

});

47

48

// Start the SDK

49

sdk.start();

50

51

// Your OpenTelemetry instrumented code will now automatically

52

// export spans to Langfuse

53

```

54

55

## Architecture

56

57

The package implements a single main component:

58

59

- **LangfuseSpanProcessor**: A `SpanProcessor` implementation that integrates with OpenTelemetry's tracing pipeline. It extends standard span processing to provide:

60

- Automatic batching and flushing of spans to Langfuse

61

- Media content extraction and upload from base64 data URIs

62

- Data masking capabilities for sensitive information

63

- Conditional span export based on custom logic

64

- Environment and release tagging

65

66

The processor supports two export modes:

67

- **batched** (default): Recommended for production environments with long-running processes. Spans are batched and exported in groups for optimal performance.

68

- **immediate**: Recommended for short-lived environments such as serverless functions. Spans are exported immediately to prevent data loss when the process terminates.

69

70

## Capabilities

71

72

### Span Processor

73

74

OpenTelemetry span processor for exporting spans to Langfuse with advanced features.

75

76

```typescript { .api }

77

/**

78

* Creates a new LangfuseSpanProcessor instance.

79

*

80

* @param params - Configuration parameters for the processor

81

*/

82

class LangfuseSpanProcessor implements SpanProcessor {

83

constructor(params?: LangfuseSpanProcessorParams);

84

85

/**

86

* Called when a span is started. Adds environment and release attributes.

87

*

88

* @param span - The span that was started

89

* @param parentContext - The parent context

90

*/

91

onStart(span: Span, parentContext: any): void;

92

93

/**

94

* Called when a span ends. Processes the span for export to Langfuse.

95

* This method checks if the span should be exported, applies data masking,

96

* handles media content extraction and upload, and passes the span to the

97

* exporter.

98

*

99

* @param span - The span that ended

100

*/

101

onEnd(span: ReadableSpan): void;

102

103

/**

104

* Forces an immediate flush of all pending spans and media uploads.

105

*

106

* @returns Promise that resolves when all pending operations are complete

107

*/

108

forceFlush(): Promise<void>;

109

110

/**

111

* Gracefully shuts down the processor, ensuring all pending operations

112

* are completed.

113

*

114

* @returns Promise that resolves when shutdown is complete

115

*/

116

shutdown(): Promise<void>;

117

}

118

```

119

120

**Usage Example with Data Masking:**

121

122

```typescript

123

import { LangfuseSpanProcessor } from '@langfuse/otel';

124

125

const processor = new LangfuseSpanProcessor({

126

publicKey: 'pk_...',

127

secretKey: 'sk_...',

128

environment: 'staging',

129

mask: ({ data }) => {

130

// Mask sensitive data like passwords and API keys

131

if (typeof data === 'string') {

132

return data

133

.replace(/password=\w+/g, 'password=***')

134

.replace(/api_key=[\w-]+/g, 'api_key=***');

135

}

136

return data;

137

}

138

});

139

```

140

141

**Usage Example with Span Filtering:**

142

143

```typescript

144

import { LangfuseSpanProcessor } from '@langfuse/otel';

145

146

const processor = new LangfuseSpanProcessor({

147

publicKey: 'pk_...',

148

secretKey: 'sk_...',

149

shouldExportSpan: ({ otelSpan }) => {

150

// Only export spans from specific services or that meet certain criteria

151

return otelSpan.name.startsWith('llm-') ||

152

otelSpan.attributes['service.name'] === 'my-ai-service';

153

}

154

});

155

```

156

157

**Usage Example for Serverless:**

158

159

```typescript

160

import { LangfuseSpanProcessor } from '@langfuse/otel';

161

162

const processor = new LangfuseSpanProcessor({

163

publicKey: 'pk_...',

164

secretKey: 'sk_...',

165

exportMode: 'immediate', // Export spans immediately in serverless

166

timeout: 10 // Increase timeout for serverless cold starts

167

});

168

```

169

170

### Configuration

171

172

Configuration parameters for the LangfuseSpanProcessor.

173

174

```typescript { .api }

175

interface LangfuseSpanProcessorParams {

176

/**

177

* Custom OpenTelemetry span exporter. If not provided, a default OTLP

178

* exporter will be used that sends spans to the Langfuse API.

179

*/

180

exporter?: SpanExporter;

181

182

/**

183

* Langfuse public API key. Can also be set via LANGFUSE_PUBLIC_KEY

184

* environment variable.

185

*/

186

publicKey?: string;

187

188

/**

189

* Langfuse secret API key. Can also be set via LANGFUSE_SECRET_KEY

190

* environment variable.

191

*/

192

secretKey?: string;

193

194

/**

195

* Langfuse instance base URL. Can also be set via LANGFUSE_BASE_URL

196

* or LANGFUSE_BASEURL (legacy) environment variable.

197

* @defaultValue "https://cloud.langfuse.com"

198

*/

199

baseUrl?: string;

200

201

/**

202

* Number of spans to batch before flushing. Can also be set via

203

* LANGFUSE_FLUSH_AT environment variable. Only applies when exportMode

204

* is "batched".

205

*/

206

flushAt?: number;

207

208

/**

209

* Flush interval in seconds. Can also be set via LANGFUSE_FLUSH_INTERVAL

210

* environment variable. Only applies when exportMode is "batched".

211

*/

212

flushInterval?: number;

213

214

/**

215

* Function to mask sensitive data in spans before export. Applied to

216

* span attributes containing input, output, and metadata fields.

217

*/

218

mask?: MaskFunction;

219

220

/**

221

* Function to determine whether a span should be exported to Langfuse.

222

* If this function returns false, the span will not be exported.

223

*/

224

shouldExportSpan?: ShouldExportSpan;

225

226

/**

227

* Environment identifier for the traces. Can also be set via

228

* LANGFUSE_TRACING_ENVIRONMENT environment variable. This value is added

229

* as an attribute to all spans.

230

*/

231

environment?: string;

232

233

/**

234

* Release identifier for the traces. Can also be set via LANGFUSE_RELEASE

235

* environment variable. This value is added as an attribute to all spans.

236

*/

237

release?: string;

238

239

/**

240

* Request timeout in seconds. Can also be set via LANGFUSE_TIMEOUT

241

* environment variable.

242

* @defaultValue 5

243

*/

244

timeout?: number;

245

246

/**

247

* Additional HTTP headers to include with requests to the Langfuse API.

248

*/

249

additionalHeaders?: Record<string, string>;

250

251

/**

252

* Span export mode to use.

253

*

254

* - **batched**: Recommended for production environments with long-running

255

* processes. Spans are batched and exported in groups for optimal

256

* performance.

257

* - **immediate**: Recommended for short-lived environments such as

258

* serverless functions. Spans are exported immediately to prevent data

259

* loss when the process terminates or is frozen.

260

*

261

* @defaultValue "batched"

262

*/

263

exportMode?: "immediate" | "batched";

264

}

265

```

266

267

### Mask Function

268

269

Function type for masking sensitive data in spans before export.

270

271

```typescript { .api }

272

/**

273

* Function type for masking sensitive data in spans before export.

274

*

275

* The mask function is applied to span attributes that may contain sensitive

276

* information (input, output, and metadata fields). It receives the data and

277

* should return a masked version of it.

278

*

279

* @param params - Object containing the data to be masked

280

* @param params.data - The data that should be masked (can be of any type)

281

* @returns The masked data (can be of any type)

282

*/

283

type MaskFunction = (params: { data: any }) => any;

284

```

285

286

**Usage Example:**

287

288

```typescript

289

const maskFunction: MaskFunction = ({ data }) => {

290

if (typeof data === 'string') {

291

// Mask credit card numbers

292

data = data.replace(/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/g, 'XXXX-XXXX-XXXX-XXXX');

293

// Mask email addresses

294

data = data.replace(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/g, '***@***.***');

295

} else if (typeof data === 'object' && data !== null) {

296

// Deep clone and mask object properties

297

const masked = JSON.parse(JSON.stringify(data));

298

if ('password' in masked) masked.password = '***';

299

if ('apiKey' in masked) masked.apiKey = '***';

300

return masked;

301

}

302

return data;

303

};

304

```

305

306

### Should Export Span Function

307

308

Function type for determining whether a span should be exported to Langfuse.

309

310

```typescript { .api }

311

/**

312

* Function type for determining whether a span should be exported to Langfuse.

313

*

314

* This function is called for each span before it is exported. If it returns

315

* false, the span will not be exported to Langfuse.

316

*

317

* @param params - Object containing the span to evaluate

318

* @param params.otelSpan - The OpenTelemetry span to evaluate

319

* @returns true if the span should be exported, false otherwise

320

*/

321

type ShouldExportSpan = (params: { otelSpan: ReadableSpan }) => boolean;

322

```

323

324

**Usage Example:**

325

326

```typescript

327

const shouldExportSpan: ShouldExportSpan = ({ otelSpan }) => {

328

// Only export spans that took longer than 100ms

329

const durationMs = otelSpan.duration[0] * 1000 + otelSpan.duration[1] / 1000000;

330

if (durationMs <= 100) return false;

331

332

// Don't export health check spans

333

if (otelSpan.name.includes('health-check')) return false;

334

335

// Only export spans from production environment

336

const env = otelSpan.attributes['deployment.environment'];

337

return env === 'production';

338

};

339

```

340

341

## Environment Variables

342

343

The package can be configured using the following environment variables:

344

345

- `LANGFUSE_PUBLIC_KEY` - Langfuse public API key

346

- `LANGFUSE_SECRET_KEY` - Langfuse secret API key

347

- `LANGFUSE_BASE_URL` - Langfuse instance base URL (defaults to "https://cloud.langfuse.com")

348

- `LANGFUSE_BASEURL` - Legacy alternative for LANGFUSE_BASE_URL

349

- `LANGFUSE_FLUSH_AT` - Number of spans to batch before flushing (batched mode only)

350

- `LANGFUSE_FLUSH_INTERVAL` - Flush interval in seconds (batched mode only)

351

- `LANGFUSE_TRACING_ENVIRONMENT` - Environment identifier for traces

352

- `LANGFUSE_RELEASE` - Release identifier for traces

353

- `LANGFUSE_TIMEOUT` - Request timeout in seconds (defaults to 5)

354

355

## Peer Dependencies

356

357

This package requires the following peer dependencies to be installed:

358

359

- `@opentelemetry/api` ^1.9.0

360

- `@opentelemetry/core` ^2.0.1

361

- `@opentelemetry/exporter-trace-otlp-http` >=0.202.0 <1.0.0

362

- `@opentelemetry/sdk-trace-base` ^2.0.1

363

364

## Types

365

366

The package uses OpenTelemetry types from peer dependencies:

367

368

- `Span` - from `@opentelemetry/sdk-trace-base`

369

- `ReadableSpan` - from `@opentelemetry/sdk-trace-base`

370

- `SpanProcessor` - from `@opentelemetry/sdk-trace-base`

371

- `SpanExporter` - from `@opentelemetry/sdk-trace-base`

372

373

## Error Handling

374

375

The processor handles errors gracefully:

376

377

- If a `MaskFunction` throws an error, the affected attribute is fully masked with the string `"<fully masked due to failed mask function>"` and a warning is logged

378

- If a `ShouldExportSpan` function throws an error, the span is excluded from export and an error is logged

379

- Media upload failures are logged as errors but do not prevent span export

380

- Authentication failures (missing public/secret key) are logged as warnings during initialization

381