or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mdcore-tracing.mddatasets.mdexperiments.mdindex.mdintegrations.mdobservation-types.mdprompts.mdscoring.md

advanced.mddocs/

0

# Advanced Features

1

2

Support for media uploads, data masking, multi-project setups, and advanced configuration options for complex production environments and specialized use cases.

3

4

## Capabilities

5

6

### Media Handling

7

8

Support for uploading and managing media files with traces and observations.

9

10

```python { .api }

11

class LangfuseMedia:

12

def __init__(self, *, obj: object = None, base64_data_uri: str = None,

13

content_type: MediaContentType = None, content_bytes: bytes = None,

14

file_path: str = None):

15

"""Initialize media object for Langfuse.

16

17

Args:

18

obj: Source object to wrap (images, files, etc.)

19

base64_data_uri: Base64 encoded data URI

20

content_type: MIME type of the media content

21

content_bytes: Raw bytes content

22

file_path: Path to file to upload

23

24

Note:

25

Provide one of: obj, base64_data_uri, content_bytes + content_type, or file_path

26

"""

27

28

@staticmethod

29

def parse_reference_string(reference: str) -> ParsedMediaReference:

30

"""Parse media reference string into components.

31

32

Args:

33

reference: Media reference string from Langfuse

34

35

Returns:

36

ParsedMediaReference with parsed components

37

"""

38

39

@staticmethod

40

def resolve_media_references(data: Any) -> Any:

41

"""Replace media references in data with actual content.

42

43

Args:

44

data: Data structure potentially containing media references

45

46

Returns:

47

Data with media references resolved to content

48

"""

49

```

50

51

### Trace and Context Management

52

53

Advanced utilities for working with trace context and IDs.

54

55

```python { .api }

56

class Langfuse:

57

def create_trace_id(self) -> str:

58

"""Generate a unique trace ID.

59

60

Returns:

61

New trace ID string

62

"""

63

64

def get_current_trace_id(self) -> str:

65

"""Get current trace ID from execution context.

66

67

Returns:

68

Current trace ID or None if no active trace

69

70

Raises:

71

Exception: If no current trace found

72

"""

73

74

def get_current_observation_id(self) -> str:

75

"""Get current observation ID from execution context.

76

77

Returns:

78

Current observation ID or None if no active observation

79

80

Raises:

81

Exception: If no current observation found

82

"""

83

84

def get_trace_url(self, trace_id: str) -> str:

85

"""Get URL to view trace in Langfuse UI.

86

87

Args:

88

trace_id: Trace ID to generate URL for

89

90

Returns:

91

URL string for viewing trace in Langfuse dashboard

92

"""

93

```

94

95

### Authentication and Health Checks

96

97

Methods for validating client configuration and connectivity.

98

99

```python { .api }

100

class Langfuse:

101

def auth_check(self) -> bool:

102

"""Verify API authentication credentials.

103

104

Returns:

105

True if authentication is valid, False otherwise

106

107

Raises:

108

Exception: If unable to perform authentication check

109

"""

110

```

111

112

### Multi-Project Support

113

114

Support for managing multiple Langfuse projects within a single application.

115

116

```python { .api }

117

def get_client(*, public_key: str = None) -> Langfuse:

118

"""Get or create Langfuse client for multi-project setups.

119

120

Args:

121

public_key: Project identifier for specific project

122

123

Returns:

124

Langfuse client instance for the specified project

125

126

Note:

127

- Without public_key: Returns single client or disabled client if multiple exist

128

- With public_key: Returns client for that specific project

129

- Multi-project support is experimental

130

"""

131

132

# Special parameters for observe decorator in multi-project setups

133

def decorated_function(data, langfuse_public_key=None):

134

"""Functions decorated with @observe accept langfuse_public_key parameter."""

135

pass

136

```

137

138

### Data Masking and Privacy

139

140

Protocol and utilities for implementing data masking in traces.

141

142

```python { .api }

143

# Masking function protocol

144

class MaskFunction(Protocol):

145

def __call__(self, *, data: Any) -> Any:

146

"""Transform data to mask sensitive information.

147

148

Args:

149

data: Original data to mask

150

151

Returns:

152

Masked version of the data

153

"""

154

155

class Langfuse:

156

def __init__(self, *, mask: MaskFunction = None, **kwargs):

157

"""Initialize Langfuse client with optional data masking.

158

159

Args:

160

mask: Function to apply data masking to all trace data

161

**kwargs: Other initialization parameters

162

"""

163

```

164

165

### Advanced Configuration Options

166

167

Extended configuration for production and specialized environments.

168

169

```python { .api }

170

class Langfuse:

171

def __init__(self, *, public_key: str = None, secret_key: str = None,

172

host: str = "https://cloud.langfuse.com", tracing_enabled: bool = True,

173

environment: str = None, timeout: int = 60, flush_at: int = 15,

174

flush_interval: float = 0.5, release: str = None,

175

sample_rate: float = 1.0, mask: MaskFunction = None,

176

additional_headers: Dict[str, str] = None,

177

blocked_instrumentation_scopes: List[str] = None,

178

media_upload_thread_count: int = 3):

179

"""Initialize Langfuse client with advanced configuration.

180

181

Args:

182

public_key: Project public key

183

secret_key: Project secret key

184

host: Langfuse server URL

185

tracing_enabled: Global tracing enable/disable

186

environment: Environment tag for all traces

187

timeout: Request timeout in seconds

188

flush_at: Number of events to batch before flushing

189

flush_interval: Time interval between flushes in seconds

190

release: Release/version identifier

191

sample_rate: Sampling rate (0.0 to 1.0) for traces

192

mask: Data masking function

193

additional_headers: Additional HTTP headers for requests

194

blocked_instrumentation_scopes: OpenTelemetry scopes to block

195

media_upload_thread_count: Number of threads for media uploads

196

"""

197

```

198

199

### Type Definitions for Advanced Features

200

201

Supporting types for advanced functionality.

202

203

```python { .api }

204

# Media content types

205

MediaContentType = str # MIME type strings

206

207

# Parsed media reference structure

208

ParsedMediaReference = TypedDict('ParsedMediaReference', {

209

'type': str,

210

'id': str,

211

'url': str

212

})

213

214

# Trace context structure

215

TraceContext = TypedDict('TraceContext', {

216

'trace_id': str,

217

'parent_span_id': Optional[str]

218

})

219

220

# Map value type for flexible attributes

221

MapValue = Union[str, int, float, bool, List[Any], Dict[str, Any]]

222

```

223

224

## Usage Examples

225

226

### Media Upload and Handling

227

228

```python

229

from langfuse import Langfuse, LangfuseMedia

230

import base64

231

from PIL import Image

232

233

langfuse = Langfuse()

234

235

# Upload image file

236

image_media = LangfuseMedia(file_path="path/to/image.jpg")

237

238

with langfuse.start_as_current_span(name="image-analysis") as span:

239

# Include media in trace

240

span.update(

241

input={"image": image_media, "prompt": "Analyze this image"},

242

output={"description": "A beautiful landscape with mountains"}

243

)

244

245

# Upload from bytes

246

with open("image.jpg", "rb") as f:

247

image_bytes = f.read()

248

249

bytes_media = LangfuseMedia(

250

content_bytes=image_bytes,

251

content_type="image/jpeg"

252

)

253

254

# Upload from base64 data URI

255

def image_to_base64_uri(image_path):

256

with open(image_path, "rb") as f:

257

encoded = base64.b64encode(f.read()).decode()

258

return f"data:image/jpeg;base64,{encoded}"

259

260

uri_media = LangfuseMedia(base64_data_uri=image_to_base64_uri("image.jpg"))

261

262

# Upload Python object (e.g., matplotlib figure)

263

import matplotlib.pyplot as plt

264

265

fig, ax = plt.subplots()

266

ax.plot([1, 2, 3, 4], [1, 4, 2, 3])

267

268

figure_media = LangfuseMedia(obj=fig)

269

270

span.update(input={"plot": figure_media})

271

```

272

273

### Data Masking Implementation

274

275

```python

276

import re

277

from typing import Any

278

279

class DataMasker:

280

"""Custom data masking implementation."""

281

282

def __init__(self):

283

# Common sensitive patterns

284

self.patterns = {

285

'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),

286

'phone': re.compile(r'\b\d{3}-\d{3}-\d{4}\b'),

287

'ssn': re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),

288

'credit_card': re.compile(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b')

289

}

290

291

def __call__(self, *, data: Any) -> Any:

292

"""Mask sensitive data according to privacy requirements."""

293

return self._mask_recursive(data)

294

295

def _mask_recursive(self, obj):

296

"""Recursively mask data in nested structures."""

297

if isinstance(obj, str):

298

return self._mask_string(obj)

299

elif isinstance(obj, dict):

300

return {k: self._mask_recursive(v) for k, v in obj.items()}

301

elif isinstance(obj, list):

302

return [self._mask_recursive(item) for item in obj]

303

else:

304

return obj

305

306

def _mask_string(self, text: str) -> str:

307

"""Apply masking patterns to string content."""

308

masked = text

309

for pattern_name, pattern in self.patterns.items():

310

masked = pattern.sub(f'[MASKED_{pattern_name.upper()}]', masked)

311

return masked

312

313

# Initialize Langfuse with data masking

314

masker = DataMasker()

315

langfuse = Langfuse(mask=masker)

316

317

# All trace data automatically masked

318

with langfuse.start_as_current_span(name="process-user-data") as span:

319

user_input = "My email is john@example.com and phone is 555-123-4567"

320

span.update(input=user_input)

321

# Stored as: "My email is [MASKED_EMAIL] and phone is [MASKED_PHONE]"

322

```

323

324

### Multi-Project Configuration

325

326

```python

327

# Initialize multiple clients for different projects

328

project_a = Langfuse(

329

public_key="project-a-key",

330

secret_key="project-a-secret",

331

environment="production"

332

)

333

334

project_b = Langfuse(

335

public_key="project-b-key",

336

secret_key="project-b-secret",

337

environment="staging"

338

)

339

340

# Use specific clients

341

with project_a.start_as_current_span(name="project-a-task") as span:

342

result_a = process_for_project_a()

343

span.update(output=result_a)

344

345

# Use get_client for dynamic project selection

346

@observe # Uses default client resolution

347

def shared_function(data, langfuse_public_key=None):

348

# Function uses client associated with langfuse_public_key

349

return process_data(data)

350

351

# Call with specific project

352

result = shared_function(data, langfuse_public_key="project-a-key")

353

```

354

355

### Advanced Client Configuration

356

357

```python

358

# Production configuration with all options

359

production_langfuse = Langfuse(

360

public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),

361

secret_key=os.getenv("LANGFUSE_SECRET_KEY"),

362

host="https://your-langfuse-instance.com",

363

environment="production",

364

release="v1.2.3",

365

sample_rate=0.1, # Sample 10% of traces

366

flush_at=50, # Batch 50 events before flushing

367

flush_interval=2.0, # Flush every 2 seconds

368

timeout=30, # 30 second timeout

369

media_upload_thread_count=5, # 5 threads for media uploads

370

additional_headers={

371

"X-Custom-Header": "custom-value",

372

"Authorization-Proxy": "proxy-token"

373

},

374

blocked_instrumentation_scopes=[

375

"httpx", # Block httpx instrumentation

376

"requests" # Block requests instrumentation

377

]

378

)

379

```

380

381

### Trace Context Management

382

383

```python

384

# Manual trace context management

385

trace_id = langfuse.create_trace_id()

386

print(f"Created trace: {trace_id}")

387

388

# Use explicit trace context

389

with langfuse.start_as_current_span(name="main-process", trace_id=trace_id) as main_span:

390

# Get current context

391

current_trace = langfuse.get_current_trace_id()

392

current_observation = langfuse.get_current_observation_id()

393

394

print(f"Current trace: {current_trace}")

395

print(f"Current observation: {current_observation}")

396

397

# Create child with explicit parent

398

child_span = main_span.start_observation(

399

name="child-process",

400

as_type="span"

401

)

402

403

try:

404

result = child_process()

405

child_span.update(output=result)

406

finally:

407

child_span.end()

408

409

# Generate UI link

410

trace_url = langfuse.get_trace_url(trace_id)

411

print(f"View trace: {trace_url}")

412

```

413

414

### Sampling and Performance Optimization

415

416

```python

417

class SmartSampler:

418

"""Custom sampling logic for different scenarios."""

419

420

def __init__(self, base_rate=0.1):

421

self.base_rate = base_rate

422

self.error_rate = 1.0 # Always sample errors

423

self.slow_request_rate = 0.5 # Sample 50% of slow requests

424

425

def should_sample(self, context):

426

"""Determine if request should be sampled."""

427

import random

428

429

# Always sample errors

430

if context.get("has_error"):

431

return True

432

433

# Higher sampling for slow requests

434

if context.get("execution_time", 0) > 5.0:

435

return random.random() < self.slow_request_rate

436

437

# Base sampling rate

438

return random.random() < self.base_rate

439

440

# Implementation with conditional tracing

441

sampler = SmartSampler()

442

443

def traced_function(input_data):

444

start_time = time.time()

445

has_error = False

446

result = None

447

448

try:

449

result = process_data(input_data)

450

except Exception as e:

451

has_error = True

452

raise

453

finally:

454

execution_time = time.time() - start_time

455

456

# Determine if we should trace this execution

457

context = {

458

"has_error": has_error,

459

"execution_time": execution_time

460

}

461

462

if sampler.should_sample(context):

463

# Create trace retrospectively if needed

464

with langfuse.start_as_current_span(name="sampled-function") as span:

465

span.update(

466

input=input_data,

467

output=result,

468

metadata={

469

"execution_time": execution_time,

470

"sampled": True

471

}

472

)

473

474

if has_error:

475

span.update(level="ERROR")

476

477

return result

478

```

479

480

### Health Monitoring and Diagnostics

481

482

```python

483

class LangfuseHealthMonitor:

484

"""Monitor Langfuse client health and connectivity."""

485

486

def __init__(self, langfuse_client):

487

self.client = langfuse_client

488

489

def run_diagnostics(self):

490

"""Run comprehensive health checks."""

491

diagnostics = {

492

"timestamp": datetime.now().isoformat(),

493

"checks": {}

494

}

495

496

# Authentication check

497

try:

498

auth_ok = self.client.auth_check()

499

diagnostics["checks"]["authentication"] = {

500

"status": "pass" if auth_ok else "fail",

501

"message": "Authentication successful" if auth_ok else "Authentication failed"

502

}

503

except Exception as e:

504

diagnostics["checks"]["authentication"] = {

505

"status": "error",

506

"message": f"Auth check failed: {str(e)}"

507

}

508

509

# Trace creation test

510

try:

511

trace_id = self.client.create_trace_id()

512

diagnostics["checks"]["trace_creation"] = {

513

"status": "pass",

514

"message": f"Trace ID generated: {trace_id[:8]}..."

515

}

516

except Exception as e:

517

diagnostics["checks"]["trace_creation"] = {

518

"status": "error",

519

"message": f"Trace creation failed: {str(e)}"

520

}

521

522

# Span creation test

523

try:

524

with self.client.start_as_current_span(name="health-check") as span:

525

span.update(output="Health check successful")

526

diagnostics["checks"]["span_creation"] = {

527

"status": "pass",

528

"message": "Span creation and management successful"

529

}

530

except Exception as e:

531

diagnostics["checks"]["span_creation"] = {

532

"status": "error",

533

"message": f"Span creation failed: {str(e)}"

534

}

535

536

# Overall status

537

all_passed = all(

538

check["status"] == "pass"

539

for check in diagnostics["checks"].values()

540

)

541

diagnostics["overall_status"] = "healthy" if all_passed else "unhealthy"

542

543

return diagnostics

544

545

def continuous_monitoring(self, interval=300):

546

"""Run continuous health monitoring."""

547

while True:

548

try:

549

results = self.run_diagnostics()

550

print(f"Health check: {results['overall_status']}")

551

552

if results["overall_status"] != "healthy":

553

for check_name, check_result in results["checks"].items():

554

if check_result["status"] != "pass":

555

print(f" {check_name}: {check_result['message']}")

556

557

except KeyboardInterrupt:

558

break

559

except Exception as e:

560

print(f"Health monitoring error: {e}")

561

562

time.sleep(interval)

563

564

# Usage

565

monitor = LangfuseHealthMonitor(langfuse)

566

health_report = monitor.run_diagnostics()

567

print(health_report)

568

```

569

570

### Environment-Specific Configuration

571

572

```python

573

import os

574

from typing import Optional

575

576

class LangfuseFactory:

577

"""Factory for creating environment-appropriate Langfuse clients."""

578

579

@classmethod

580

def create_for_environment(cls, environment: Optional[str] = None) -> Langfuse:

581

"""Create Langfuse client configured for specific environment."""

582

583

env = environment or os.getenv("ENVIRONMENT", "development")

584

585

configs = {

586

"development": {

587

"tracing_enabled": True,

588

"sample_rate": 1.0,

589

"flush_at": 1, # Immediate flushing for dev

590

"timeout": 10

591

},

592

"staging": {

593

"tracing_enabled": True,

594

"sample_rate": 0.5,

595

"flush_at": 10,

596

"timeout": 20

597

},

598

"production": {

599

"tracing_enabled": True,

600

"sample_rate": 0.1, # Lower sampling in prod

601

"flush_at": 50,

602

"timeout": 30,

603

"additional_headers": {"X-Service": "ai-service"}

604

},

605

"test": {

606

"tracing_enabled": False # Disable for tests

607

}

608

}

609

610

config = configs.get(env, configs["development"])

611

612

return Langfuse(

613

environment=env,

614

**config

615

)

616

617

# Usage

618

langfuse = LangfuseFactory.create_for_environment()

619

620

# Override for specific cases

621

production_client = LangfuseFactory.create_for_environment("production")

622

```