0
# Advanced Features
1
2
Support for media uploads, data masking, multi-project setups, and advanced configuration options for complex production environments and specialized use cases.
3
4
## Capabilities
5
6
### Media Handling
7
8
Support for uploading and managing media files with traces and observations.
9
10
```python { .api }
11
class LangfuseMedia:
12
def __init__(self, *, obj: object = None, base64_data_uri: str = None,
13
content_type: MediaContentType = None, content_bytes: bytes = None,
14
file_path: str = None):
15
"""Initialize media object for Langfuse.
16
17
Args:
18
obj: Source object to wrap (images, files, etc.)
19
base64_data_uri: Base64 encoded data URI
20
content_type: MIME type of the media content
21
content_bytes: Raw bytes content
22
file_path: Path to file to upload
23
24
Note:
25
Provide one of: obj, base64_data_uri, content_bytes + content_type, or file_path
26
"""
27
28
@staticmethod
29
def parse_reference_string(reference: str) -> ParsedMediaReference:
30
"""Parse media reference string into components.
31
32
Args:
33
reference: Media reference string from Langfuse
34
35
Returns:
36
ParsedMediaReference with parsed components
37
"""
38
39
@staticmethod
40
def resolve_media_references(data: Any) -> Any:
41
"""Replace media references in data with actual content.
42
43
Args:
44
data: Data structure potentially containing media references
45
46
Returns:
47
Data with media references resolved to content
48
"""
49
```
50
51
### Trace and Context Management
52
53
Advanced utilities for working with trace context and IDs.
54
55
```python { .api }
56
class Langfuse:
57
def create_trace_id(self) -> str:
58
"""Generate a unique trace ID.
59
60
Returns:
61
New trace ID string
62
"""
63
64
def get_current_trace_id(self) -> str:
65
"""Get current trace ID from execution context.
66
67
Returns:
68
Current trace ID or None if no active trace
69
70
Raises:
71
Exception: If no current trace found
72
"""
73
74
def get_current_observation_id(self) -> str:
75
"""Get current observation ID from execution context.
76
77
Returns:
78
Current observation ID or None if no active observation
79
80
Raises:
81
Exception: If no current observation found
82
"""
83
84
def get_trace_url(self, trace_id: str) -> str:
85
"""Get URL to view trace in Langfuse UI.
86
87
Args:
88
trace_id: Trace ID to generate URL for
89
90
Returns:
91
URL string for viewing trace in Langfuse dashboard
92
"""
93
```
94
95
### Authentication and Health Checks
96
97
Methods for validating client configuration and connectivity.
98
99
```python { .api }
100
class Langfuse:
101
def auth_check(self) -> bool:
102
"""Verify API authentication credentials.
103
104
Returns:
105
True if authentication is valid, False otherwise
106
107
Raises:
108
Exception: If unable to perform authentication check
109
"""
110
```
111
112
### Multi-Project Support
113
114
Support for managing multiple Langfuse projects within a single application.
115
116
```python { .api }
117
def get_client(*, public_key: str = None) -> Langfuse:
118
"""Get or create Langfuse client for multi-project setups.
119
120
Args:
121
public_key: Project identifier for specific project
122
123
Returns:
124
Langfuse client instance for the specified project
125
126
Note:
127
- Without public_key: Returns single client or disabled client if multiple exist
128
- With public_key: Returns client for that specific project
129
- Multi-project support is experimental
130
"""
131
132
# Special parameters for observe decorator in multi-project setups
133
def decorated_function(data, langfuse_public_key=None):
134
"""Functions decorated with @observe accept langfuse_public_key parameter."""
135
pass
136
```
137
138
### Data Masking and Privacy
139
140
Protocol and utilities for implementing data masking in traces.
141
142
```python { .api }
143
# Masking function protocol
144
class MaskFunction(Protocol):
145
def __call__(self, *, data: Any) -> Any:
146
"""Transform data to mask sensitive information.
147
148
Args:
149
data: Original data to mask
150
151
Returns:
152
Masked version of the data
153
"""
154
155
class Langfuse:
156
def __init__(self, *, mask: MaskFunction = None, **kwargs):
157
"""Initialize Langfuse client with optional data masking.
158
159
Args:
160
mask: Function to apply data masking to all trace data
161
**kwargs: Other initialization parameters
162
"""
163
```
164
165
### Advanced Configuration Options
166
167
Extended configuration for production and specialized environments.
168
169
```python { .api }
170
class Langfuse:
171
def __init__(self, *, public_key: str = None, secret_key: str = None,
172
host: str = "https://cloud.langfuse.com", tracing_enabled: bool = True,
173
environment: str = None, timeout: int = 60, flush_at: int = 15,
174
flush_interval: float = 0.5, release: str = None,
175
sample_rate: float = 1.0, mask: MaskFunction = None,
176
additional_headers: Dict[str, str] = None,
177
blocked_instrumentation_scopes: List[str] = None,
178
media_upload_thread_count: int = 3):
179
"""Initialize Langfuse client with advanced configuration.
180
181
Args:
182
public_key: Project public key
183
secret_key: Project secret key
184
host: Langfuse server URL
185
tracing_enabled: Global tracing enable/disable
186
environment: Environment tag for all traces
187
timeout: Request timeout in seconds
188
flush_at: Number of events to batch before flushing
189
flush_interval: Time interval between flushes in seconds
190
release: Release/version identifier
191
sample_rate: Sampling rate (0.0 to 1.0) for traces
192
mask: Data masking function
193
additional_headers: Additional HTTP headers for requests
194
blocked_instrumentation_scopes: OpenTelemetry scopes to block
195
media_upload_thread_count: Number of threads for media uploads
196
"""
197
```
198
199
### Type Definitions for Advanced Features
200
201
Supporting types for advanced functionality.
202
203
```python { .api }
204
# Media content types
205
MediaContentType = str # MIME type strings
206
207
# Parsed media reference structure
208
ParsedMediaReference = TypedDict('ParsedMediaReference', {
209
'type': str,
210
'id': str,
211
'url': str
212
})
213
214
# Trace context structure
215
TraceContext = TypedDict('TraceContext', {
216
'trace_id': str,
217
'parent_span_id': Optional[str]
218
})
219
220
# Map value type for flexible attributes
221
MapValue = Union[str, int, float, bool, List[Any], Dict[str, Any]]
222
```
223
224
## Usage Examples
225
226
### Media Upload and Handling
227
228
```python
229
from langfuse import Langfuse, LangfuseMedia
230
import base64
231
from PIL import Image
232
233
langfuse = Langfuse()
234
235
# Upload image file
236
image_media = LangfuseMedia(file_path="path/to/image.jpg")
237
238
with langfuse.start_as_current_span(name="image-analysis") as span:
239
# Include media in trace
240
span.update(
241
input={"image": image_media, "prompt": "Analyze this image"},
242
output={"description": "A beautiful landscape with mountains"}
243
)
244
245
# Upload from bytes
246
with open("image.jpg", "rb") as f:
247
image_bytes = f.read()
248
249
bytes_media = LangfuseMedia(
250
content_bytes=image_bytes,
251
content_type="image/jpeg"
252
)
253
254
# Upload from base64 data URI
255
def image_to_base64_uri(image_path):
256
with open(image_path, "rb") as f:
257
encoded = base64.b64encode(f.read()).decode()
258
return f"data:image/jpeg;base64,{encoded}"
259
260
uri_media = LangfuseMedia(base64_data_uri=image_to_base64_uri("image.jpg"))
261
262
# Upload Python object (e.g., matplotlib figure)
263
import matplotlib.pyplot as plt
264
265
fig, ax = plt.subplots()
266
ax.plot([1, 2, 3, 4], [1, 4, 2, 3])
267
268
figure_media = LangfuseMedia(obj=fig)
269
270
span.update(input={"plot": figure_media})
271
```
272
273
### Data Masking Implementation
274
275
```python
276
import re
277
from typing import Any
278
279
class DataMasker:
280
"""Custom data masking implementation."""
281
282
def __init__(self):
283
# Common sensitive patterns
284
self.patterns = {
285
'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
286
'phone': re.compile(r'\b\d{3}-\d{3}-\d{4}\b'),
287
'ssn': re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
288
'credit_card': re.compile(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b')
289
}
290
291
def __call__(self, *, data: Any) -> Any:
292
"""Mask sensitive data according to privacy requirements."""
293
return self._mask_recursive(data)
294
295
def _mask_recursive(self, obj):
296
"""Recursively mask data in nested structures."""
297
if isinstance(obj, str):
298
return self._mask_string(obj)
299
elif isinstance(obj, dict):
300
return {k: self._mask_recursive(v) for k, v in obj.items()}
301
elif isinstance(obj, list):
302
return [self._mask_recursive(item) for item in obj]
303
else:
304
return obj
305
306
def _mask_string(self, text: str) -> str:
307
"""Apply masking patterns to string content."""
308
masked = text
309
for pattern_name, pattern in self.patterns.items():
310
masked = pattern.sub(f'[MASKED_{pattern_name.upper()}]', masked)
311
return masked
312
313
# Initialize Langfuse with data masking
314
masker = DataMasker()
315
langfuse = Langfuse(mask=masker)
316
317
# All trace data automatically masked
318
with langfuse.start_as_current_span(name="process-user-data") as span:
319
user_input = "My email is john@example.com and phone is 555-123-4567"
320
span.update(input=user_input)
321
# Stored as: "My email is [MASKED_EMAIL] and phone is [MASKED_PHONE]"
322
```
323
324
### Multi-Project Configuration
325
326
```python
327
# Initialize multiple clients for different projects
328
project_a = Langfuse(
329
public_key="project-a-key",
330
secret_key="project-a-secret",
331
environment="production"
332
)
333
334
project_b = Langfuse(
335
public_key="project-b-key",
336
secret_key="project-b-secret",
337
environment="staging"
338
)
339
340
# Use specific clients
341
with project_a.start_as_current_span(name="project-a-task") as span:
342
result_a = process_for_project_a()
343
span.update(output=result_a)
344
345
# Use get_client for dynamic project selection
346
@observe # Uses default client resolution
347
def shared_function(data, langfuse_public_key=None):
348
# Function uses client associated with langfuse_public_key
349
return process_data(data)
350
351
# Call with specific project
352
result = shared_function(data, langfuse_public_key="project-a-key")
353
```
354
355
### Advanced Client Configuration
356
357
```python
358
# Production configuration with all options
359
production_langfuse = Langfuse(
360
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
361
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
362
host="https://your-langfuse-instance.com",
363
environment="production",
364
release="v1.2.3",
365
sample_rate=0.1, # Sample 10% of traces
366
flush_at=50, # Batch 50 events before flushing
367
flush_interval=2.0, # Flush every 2 seconds
368
timeout=30, # 30 second timeout
369
media_upload_thread_count=5, # 5 threads for media uploads
370
additional_headers={
371
"X-Custom-Header": "custom-value",
372
"Authorization-Proxy": "proxy-token"
373
},
374
blocked_instrumentation_scopes=[
375
"httpx", # Block httpx instrumentation
376
"requests" # Block requests instrumentation
377
]
378
)
379
```
380
381
### Trace Context Management
382
383
```python
384
# Manual trace context management
385
trace_id = langfuse.create_trace_id()
386
print(f"Created trace: {trace_id}")
387
388
# Use explicit trace context
389
with langfuse.start_as_current_span(name="main-process", trace_id=trace_id) as main_span:
390
# Get current context
391
current_trace = langfuse.get_current_trace_id()
392
current_observation = langfuse.get_current_observation_id()
393
394
print(f"Current trace: {current_trace}")
395
print(f"Current observation: {current_observation}")
396
397
# Create child with explicit parent
398
child_span = main_span.start_observation(
399
name="child-process",
400
as_type="span"
401
)
402
403
try:
404
result = child_process()
405
child_span.update(output=result)
406
finally:
407
child_span.end()
408
409
# Generate UI link
410
trace_url = langfuse.get_trace_url(trace_id)
411
print(f"View trace: {trace_url}")
412
```
413
414
### Sampling and Performance Optimization
415
416
```python
417
class SmartSampler:
418
"""Custom sampling logic for different scenarios."""
419
420
def __init__(self, base_rate=0.1):
421
self.base_rate = base_rate
422
self.error_rate = 1.0 # Always sample errors
423
self.slow_request_rate = 0.5 # Sample 50% of slow requests
424
425
def should_sample(self, context):
426
"""Determine if request should be sampled."""
427
import random
428
429
# Always sample errors
430
if context.get("has_error"):
431
return True
432
433
# Higher sampling for slow requests
434
if context.get("execution_time", 0) > 5.0:
435
return random.random() < self.slow_request_rate
436
437
# Base sampling rate
438
return random.random() < self.base_rate
439
440
# Implementation with conditional tracing
441
sampler = SmartSampler()
442
443
def traced_function(input_data):
444
start_time = time.time()
445
has_error = False
446
result = None
447
448
try:
449
result = process_data(input_data)
450
except Exception as e:
451
has_error = True
452
raise
453
finally:
454
execution_time = time.time() - start_time
455
456
# Determine if we should trace this execution
457
context = {
458
"has_error": has_error,
459
"execution_time": execution_time
460
}
461
462
if sampler.should_sample(context):
463
# Create trace retrospectively if needed
464
with langfuse.start_as_current_span(name="sampled-function") as span:
465
span.update(
466
input=input_data,
467
output=result,
468
metadata={
469
"execution_time": execution_time,
470
"sampled": True
471
}
472
)
473
474
if has_error:
475
span.update(level="ERROR")
476
477
return result
478
```
479
480
### Health Monitoring and Diagnostics
481
482
```python
483
class LangfuseHealthMonitor:
484
"""Monitor Langfuse client health and connectivity."""
485
486
def __init__(self, langfuse_client):
487
self.client = langfuse_client
488
489
def run_diagnostics(self):
490
"""Run comprehensive health checks."""
491
diagnostics = {
492
"timestamp": datetime.now().isoformat(),
493
"checks": {}
494
}
495
496
# Authentication check
497
try:
498
auth_ok = self.client.auth_check()
499
diagnostics["checks"]["authentication"] = {
500
"status": "pass" if auth_ok else "fail",
501
"message": "Authentication successful" if auth_ok else "Authentication failed"
502
}
503
except Exception as e:
504
diagnostics["checks"]["authentication"] = {
505
"status": "error",
506
"message": f"Auth check failed: {str(e)}"
507
}
508
509
# Trace creation test
510
try:
511
trace_id = self.client.create_trace_id()
512
diagnostics["checks"]["trace_creation"] = {
513
"status": "pass",
514
"message": f"Trace ID generated: {trace_id[:8]}..."
515
}
516
except Exception as e:
517
diagnostics["checks"]["trace_creation"] = {
518
"status": "error",
519
"message": f"Trace creation failed: {str(e)}"
520
}
521
522
# Span creation test
523
try:
524
with self.client.start_as_current_span(name="health-check") as span:
525
span.update(output="Health check successful")
526
diagnostics["checks"]["span_creation"] = {
527
"status": "pass",
528
"message": "Span creation and management successful"
529
}
530
except Exception as e:
531
diagnostics["checks"]["span_creation"] = {
532
"status": "error",
533
"message": f"Span creation failed: {str(e)}"
534
}
535
536
# Overall status
537
all_passed = all(
538
check["status"] == "pass"
539
for check in diagnostics["checks"].values()
540
)
541
diagnostics["overall_status"] = "healthy" if all_passed else "unhealthy"
542
543
return diagnostics
544
545
def continuous_monitoring(self, interval=300):
546
"""Run continuous health monitoring."""
547
while True:
548
try:
549
results = self.run_diagnostics()
550
print(f"Health check: {results['overall_status']}")
551
552
if results["overall_status"] != "healthy":
553
for check_name, check_result in results["checks"].items():
554
if check_result["status"] != "pass":
555
print(f" {check_name}: {check_result['message']}")
556
557
except KeyboardInterrupt:
558
break
559
except Exception as e:
560
print(f"Health monitoring error: {e}")
561
562
time.sleep(interval)
563
564
# Usage
565
monitor = LangfuseHealthMonitor(langfuse)
566
health_report = monitor.run_diagnostics()
567
print(health_report)
568
```
569
570
### Environment-Specific Configuration
571
572
```python
573
import os
574
from typing import Optional
575
576
class LangfuseFactory:
577
"""Factory for creating environment-appropriate Langfuse clients."""
578
579
@classmethod
580
def create_for_environment(cls, environment: Optional[str] = None) -> Langfuse:
581
"""Create Langfuse client configured for specific environment."""
582
583
env = environment or os.getenv("ENVIRONMENT", "development")
584
585
configs = {
586
"development": {
587
"tracing_enabled": True,
588
"sample_rate": 1.0,
589
"flush_at": 1, # Immediate flushing for dev
590
"timeout": 10
591
},
592
"staging": {
593
"tracing_enabled": True,
594
"sample_rate": 0.5,
595
"flush_at": 10,
596
"timeout": 20
597
},
598
"production": {
599
"tracing_enabled": True,
600
"sample_rate": 0.1, # Lower sampling in prod
601
"flush_at": 50,
602
"timeout": 30,
603
"additional_headers": {"X-Service": "ai-service"}
604
},
605
"test": {
606
"tracing_enabled": False # Disable for tests
607
}
608
}
609
610
config = configs.get(env, configs["development"])
611
612
return Langfuse(
613
environment=env,
614
**config
615
)
616
617
# Usage
618
langfuse = LangfuseFactory.create_for_environment()
619
620
# Override for specific cases
621
production_client = LangfuseFactory.create_for_environment("production")
622
```