0
# Session Management
1
2
Session-based configuration and resource management for distributed computing. Handles catalog connections, temporary tables, execution settings, and provides a unified context for all Daft operations.
3
4
## Capabilities
5
6
### Session Interface
7
8
Core session management for distributed DataFrame operations.
9
10
```python { .api }
11
class Session:
12
"""Main session class for distributed computing configuration."""
13
14
def __init__(self): ...
15
16
def get_catalog(self, name: str) -> Catalog:
17
"""Get attached catalog by name."""
18
19
def list_catalogs(self) -> List[str]:
20
"""List all attached catalogs."""
21
22
def attach_catalog(self, name: str, catalog: Catalog) -> None:
23
"""Attach catalog to session."""
24
25
def detach_catalog(self, name: str) -> None:
26
"""Detach catalog from session."""
27
28
def current_session() -> Session:
29
"""
30
Get current session instance.
31
32
Returns:
33
Session: Current active session
34
"""
35
36
def set_session(session: Session) -> None:
37
"""
38
Set current session.
39
40
Parameters:
41
- session: Session instance to make current
42
"""
43
44
def session() -> Session:
45
"""
46
Get or create session instance.
47
48
Returns:
49
Session: Session instance (creates if none exists)
50
"""
51
```
52
53
### Catalog Management
54
55
Attach and manage data catalogs within sessions.
56
57
```python { .api }
58
def attach_catalog(name: str, catalog: Catalog) -> None:
59
"""
60
Attach catalog to current session.
61
62
Parameters:
63
- name: Catalog name for reference
64
- catalog: Catalog instance to attach
65
"""
66
67
def detach_catalog(name: str) -> None:
68
"""
69
Detach catalog from current session.
70
71
Parameters:
72
- name: Name of catalog to detach
73
"""
74
75
def current_catalog() -> str:
76
"""
77
Get current catalog name.
78
79
Returns:
80
str: Name of current catalog
81
"""
82
83
def set_catalog(name: str) -> None:
84
"""
85
Set current catalog.
86
87
Parameters:
88
- name: Name of catalog to make current
89
"""
90
91
def get_catalog(name: str) -> Catalog:
92
"""
93
Get catalog by name.
94
95
Parameters:
96
- name: Catalog name
97
98
Returns:
99
Catalog: Catalog instance
100
"""
101
102
def has_catalog(name: str) -> bool:
103
"""
104
Check if catalog exists in session.
105
106
Parameters:
107
- name: Catalog name to check
108
109
Returns:
110
bool: True if catalog exists
111
"""
112
113
def list_catalogs() -> List[str]:
114
"""
115
List all attached catalogs.
116
117
Returns:
118
List[str]: List of catalog names
119
"""
120
```
121
122
### Namespace Management
123
124
Manage catalog namespaces within the session context.
125
126
```python { .api }
127
def create_namespace(name: str) -> None:
128
"""
129
Create namespace in current catalog.
130
131
Parameters:
132
- name: Namespace name to create
133
"""
134
135
def create_namespace_if_not_exists(name: str) -> None:
136
"""
137
Create namespace if it doesn't exist.
138
139
Parameters:
140
- name: Namespace name
141
"""
142
143
def drop_namespace(name: str) -> None:
144
"""
145
Drop namespace from current catalog.
146
147
Parameters:
148
- name: Namespace name to drop
149
"""
150
151
def current_namespace() -> str:
152
"""
153
Get current namespace name.
154
155
Returns:
156
str: Current namespace name
157
"""
158
159
def set_namespace(name: str) -> None:
160
"""
161
Set current namespace.
162
163
Parameters:
164
- name: Namespace name to set as current
165
"""
166
167
def has_namespace(name: str) -> bool:
168
"""
169
Check if namespace exists.
170
171
Parameters:
172
- name: Namespace name to check
173
174
Returns:
175
bool: True if namespace exists
176
"""
177
```
178
179
### Table Management
180
181
Register and manage temporary tables and catalog tables.
182
183
```python { .api }
184
def attach_table(df: DataFrame, name: str) -> None:
185
"""
186
Attach DataFrame as temporary table.
187
188
Parameters:
189
- df: DataFrame to attach
190
- name: Table name for reference
191
"""
192
193
def detach_table(name: str) -> None:
194
"""
195
Detach temporary table.
196
197
Parameters:
198
- name: Table name to detach
199
"""
200
201
def create_table(name: str, source: Union[Schema, DataFrame]) -> Table:
202
"""
203
Create table in current catalog.
204
205
Parameters:
206
- name: Table name
207
- source: Schema or DataFrame to create table from
208
209
Returns:
210
Table: Created table instance
211
"""
212
213
def create_table_if_not_exists(name: str, source: Union[Schema, DataFrame]) -> Table:
214
"""
215
Create table if it doesn't exist.
216
217
Parameters:
218
- name: Table name
219
- source: Schema or DataFrame
220
221
Returns:
222
Table: Table instance (existing or newly created)
223
"""
224
225
def create_temp_table(name: str, df: DataFrame) -> None:
226
"""
227
Create temporary table from DataFrame.
228
229
Parameters:
230
- name: Temporary table name
231
- df: DataFrame to use as table data
232
"""
233
234
def drop_table(name: str) -> None:
235
"""
236
Drop table from current catalog.
237
238
Parameters:
239
- name: Table name to drop
240
"""
241
242
def get_table(name: str) -> Table:
243
"""
244
Get table by name.
245
246
Parameters:
247
- name: Table name
248
249
Returns:
250
Table: Table instance
251
"""
252
253
def has_table(name: str) -> bool:
254
"""
255
Check if table exists.
256
257
Parameters:
258
- name: Table name to check
259
260
Returns:
261
bool: True if table exists
262
"""
263
264
def list_tables() -> List[str]:
265
"""
266
List all available tables.
267
268
Returns:
269
List[str]: List of table names
270
"""
271
272
def read_table(name: str, **options: Any) -> DataFrame:
273
"""
274
Read table as DataFrame.
275
276
Parameters:
277
- name: Table name to read
278
- options: Additional read options
279
280
Returns:
281
DataFrame: Table data as DataFrame
282
"""
283
284
def write_table(name: str, df: DataFrame, **options: Any) -> None:
285
"""
286
Write DataFrame to table.
287
288
Parameters:
289
- name: Table name
290
- df: DataFrame to write
291
- options: Additional write options
292
"""
293
```
294
295
### Provider Management
296
297
Manage data providers and external service connections.
298
299
```python { .api }
300
def attach_provider(name: str, provider: Any) -> None:
301
"""
302
Attach data provider to session.
303
304
Parameters:
305
- name: Provider name
306
- provider: Provider instance
307
"""
308
309
def detach_provider(name: str) -> None:
310
"""
311
Detach data provider.
312
313
Parameters:
314
- name: Provider name to detach
315
"""
316
317
def current_provider() -> str:
318
"""
319
Get current provider name.
320
321
Returns:
322
str: Current provider name
323
"""
324
325
def set_provider(name: str) -> None:
326
"""
327
Set current provider.
328
329
Parameters:
330
- name: Provider name to set as current
331
"""
332
333
def get_provider(name: str) -> Any:
334
"""
335
Get provider by name.
336
337
Parameters:
338
- name: Provider name
339
340
Returns:
341
Any: Provider instance
342
"""
343
344
def has_provider(name: str) -> bool:
345
"""
346
Check if provider exists.
347
348
Parameters:
349
- name: Provider name to check
350
351
Returns:
352
bool: True if provider exists
353
"""
354
```
355
356
### Function Management
357
358
Register custom functions for use across the session.
359
360
```python { .api }
361
def attach_function(name: str, func: Callable) -> None:
362
"""
363
Attach function to session for global use.
364
365
Parameters:
366
- name: Function name for reference
367
- func: Callable function to attach
368
"""
369
370
def detach_function(name: str) -> None:
371
"""
372
Detach function from session.
373
374
Parameters:
375
- name: Function name to detach
376
"""
377
```
378
379
### Model Management
380
381
Manage AI/ML models within the session context.
382
383
```python { .api }
384
def current_model() -> str:
385
"""
386
Get current model name.
387
388
Returns:
389
str: Current model name
390
"""
391
392
def set_model(name: str) -> None:
393
"""
394
Set current model for AI operations.
395
396
Parameters:
397
- name: Model name to set as current
398
"""
399
```
400
401
### Configuration Management
402
403
Configure execution and planning settings for the session.
404
405
```python { .api }
406
def set_execution_config(config: ExecutionConfig) -> None:
407
"""
408
Set execution configuration for the session.
409
410
Parameters:
411
- config: Execution configuration settings
412
"""
413
414
def set_planning_config(config: PlanningConfig) -> None:
415
"""
416
Set query planning configuration.
417
418
Parameters:
419
- config: Planning configuration settings
420
"""
421
422
def execution_config_ctx(config: ExecutionConfig) -> ContextManager:
423
"""
424
Context manager for temporary execution config.
425
426
Parameters:
427
- config: Temporary execution configuration
428
429
Returns:
430
ContextManager: Context manager for config scope
431
"""
432
433
def planning_config_ctx(config: PlanningConfig) -> ContextManager:
434
"""
435
Context manager for temporary planning config.
436
437
Parameters:
438
- config: Temporary planning configuration
439
440
Returns:
441
ContextManager: Context manager for config scope
442
"""
443
```
444
445
### General Attachment
446
447
Generic attachment mechanism for session objects.
448
449
```python { .api }
450
def attach(obj: Any, name: str) -> None:
451
"""
452
Attach generic object to session.
453
454
Parameters:
455
- obj: Object to attach
456
- name: Name for reference
457
"""
458
```
459
460
## Usage Examples
461
462
### Basic Session Setup
463
```python
464
import daft
465
from daft.catalog import Catalog
466
467
# Create or get current session
468
session = daft.current_session()
469
470
# Create catalogs
471
sales_catalog = Catalog.from_pydict({
472
"customers": {"id": [1, 2, 3], "name": ["A", "B", "C"]},
473
"orders": {"order_id": [101, 102], "customer_id": [1, 2]}
474
})
475
476
inventory_catalog = Catalog.from_pydict({
477
"products": {"id": [1, 2], "name": ["Widget", "Gadget"]}
478
})
479
480
# Attach catalogs to session
481
daft.attach_catalog("sales", sales_catalog)
482
daft.attach_catalog("inventory", inventory_catalog)
483
484
# List available catalogs
485
print(f"Available catalogs: {daft.list_catalogs()}")
486
```
487
488
### Working with Multiple Catalogs
489
```python
490
# Set current catalog
491
daft.set_catalog("sales")
492
493
# Work with tables in current catalog
494
customers_df = daft.read_table("customers")
495
orders_df = daft.read_table("orders")
496
497
# Switch to different catalog
498
daft.set_catalog("inventory")
499
products_df = daft.read_table("products")
500
501
# Join data across catalogs
502
result = customers_df.join(
503
orders_df,
504
on=daft.col("id") == daft.col("customer_id")
505
).join(
506
products_df.rename({"id": "product_id"}),
507
on=daft.col("product_id") == daft.col("product_id")
508
)
509
```
510
511
### Temporary Tables and SQL
512
```python
513
# Create temporary table
514
temp_data = daft.from_pydict({
515
"region": ["North", "South", "East", "West"],
516
"sales": [1000, 1500, 1200, 800]
517
})
518
519
daft.attach_table(temp_data, "regional_sales")
520
521
# Use in SQL queries
522
sql_result = daft.sql("""
523
SELECT r.region, r.sales, c.name
524
FROM regional_sales r
525
JOIN sales.customers c ON c.id <= 2
526
ORDER BY r.sales DESC
527
""")
528
529
# Clean up temporary table
530
daft.detach_table("regional_sales")
531
```
532
533
### Session Configuration
534
```python
535
from daft.context import ExecutionConfig, PlanningConfig
536
537
# Configure execution settings
538
exec_config = ExecutionConfig(
539
default_morsel_size=1000000,
540
num_scan_tasks=16
541
)
542
daft.set_execution_config(exec_config)
543
544
# Configure planning settings
545
plan_config = PlanningConfig(
546
broadcast_join_size_threshold=100000
547
)
548
daft.set_planning_config(plan_config)
549
550
# Use temporary configuration
551
with daft.execution_config_ctx(ExecutionConfig(num_scan_tasks=32)):
552
# This operation uses 32 scan tasks
553
large_df = daft.read_parquet("s3://bucket/large-dataset/*.parquet")
554
result = large_df.groupby("category").count().collect()
555
```
556
557
### Function Registration
558
```python
559
# Define custom function
560
@daft.func
561
def custom_transform(value: str) -> str:
562
return value.upper() + "_PROCESSED"
563
564
# Register function globally
565
daft.attach_function("global_transform", custom_transform)
566
567
# Use registered function in SQL
568
sql_with_udf = daft.sql("""
569
SELECT name, global_transform(name) as processed_name
570
FROM sales.customers
571
""")
572
573
# Detach when no longer needed
574
daft.detach_function("global_transform")
575
```
576
577
### Multi-Environment Session Management
578
```python
579
def setup_development_session():
580
"""Setup session for development environment."""
581
# Development catalogs with sample data
582
dev_catalog = Catalog.from_pydict({
583
"users": {"id": [1, 2], "name": ["Dev User 1", "Dev User 2"]}
584
})
585
586
daft.attach_catalog("main", dev_catalog)
587
daft.set_catalog("main")
588
589
# Development-specific configuration
590
daft.set_execution_config(ExecutionConfig(
591
default_morsel_size=10000 # Smaller for dev
592
))
593
594
def setup_production_session():
595
"""Setup session for production environment."""
596
# Production catalogs from external systems
597
from pyiceberg.catalog import load_catalog
598
599
prod_iceberg = load_catalog("prod_catalog")
600
prod_catalog = Catalog.from_iceberg(prod_iceberg)
601
602
daft.attach_catalog("main", prod_catalog)
603
daft.set_catalog("main")
604
605
# Production-optimized configuration
606
daft.set_execution_config(ExecutionConfig(
607
default_morsel_size=10000000 # Larger for production
608
))
609
610
# Environment-specific setup
611
import os
612
if os.getenv("ENVIRONMENT") == "production":
613
setup_production_session()
614
else:
615
setup_development_session()
616
```
617
618
### Advanced Session Management
619
```python
620
class DataPipeline:
621
def __init__(self):
622
self.session = daft.session()
623
self.setup_catalogs()
624
self.register_functions()
625
626
def setup_catalogs(self):
627
"""Setup all required catalogs."""
628
# Raw data catalog
629
raw_catalog = Catalog.from_s3tables("arn:aws:s3:::raw-data-bucket")
630
daft.attach_catalog("raw", raw_catalog)
631
632
# Processed data catalog
633
processed_catalog = Catalog.from_unity(unity_client)
634
daft.attach_catalog("processed", processed_catalog)
635
636
# Set default catalog
637
daft.set_catalog("processed")
638
639
def register_functions(self):
640
"""Register pipeline-specific functions."""
641
@daft.func
642
def clean_text(text: str) -> str:
643
return text.strip().lower()
644
645
@daft.func
646
def validate_email(email: str) -> bool:
647
return "@" in email and "." in email
648
649
daft.attach_function("clean_text", clean_text)
650
daft.attach_function("validate_email", validate_email)
651
652
def run_pipeline(self):
653
"""Execute data pipeline."""
654
# Read from raw data
655
daft.set_catalog("raw")
656
raw_df = daft.read_table("user_data")
657
658
# Process data
659
cleaned_df = raw_df.select(
660
daft.col("id"),
661
daft.sql_expr("clean_text(name)").alias("name"),
662
daft.col("email")
663
).filter(
664
daft.sql_expr("validate_email(email)")
665
)
666
667
# Write to processed catalog
668
daft.set_catalog("processed")
669
daft.write_table("clean_users", cleaned_df)
670
671
def cleanup(self):
672
"""Clean up session resources."""
673
daft.detach_function("clean_text")
674
daft.detach_function("validate_email")
675
daft.detach_catalog("raw")
676
daft.detach_catalog("processed")
677
678
# Use pipeline
679
pipeline = DataPipeline()
680
try:
681
pipeline.run_pipeline()
682
finally:
683
pipeline.cleanup()
684
```
685
686
### Session State Inspection
687
```python
688
def inspect_session():
689
"""Inspect current session state."""
690
print(f"Current session: {daft.current_session()}")
691
print(f"Current catalog: {daft.current_catalog()}")
692
print(f"Current namespace: {daft.current_namespace()}")
693
print(f"Available catalogs: {daft.list_catalogs()}")
694
print(f"Available tables: {daft.list_tables()}")
695
696
# Check specific resources
697
if daft.has_catalog("main"):
698
print("Main catalog is available")
699
700
if daft.has_table("users"):
701
print("Users table is available")
702
703
# Inspect current state
704
inspect_session()
705
```
706
707
### Error Handling and Recovery
708
```python
709
def safe_session_operation():
710
"""Perform session operations with error handling."""
711
try:
712
# Setup catalogs
713
catalog = Catalog.from_iceberg(iceberg_instance)
714
daft.attach_catalog("main", catalog)
715
716
# Perform operations
717
df = daft.read_table("main.sales.transactions")
718
result = df.groupby("region").sum("amount").collect()
719
720
return result
721
722
except Exception as e:
723
print(f"Session operation failed: {e}")
724
725
# Cleanup on error
726
if daft.has_catalog("main"):
727
daft.detach_catalog("main")
728
729
raise
730
731
finally:
732
# Ensure cleanup
733
print("Session operation completed")
734
735
# Safe execution
736
try:
737
result = safe_session_operation()
738
except Exception:
739
print("Operation failed, session cleaned up")
740
```
741
742
Session management in Daft provides a comprehensive framework for organizing and coordinating distributed data operations across multiple catalogs, tables, and computational resources with proper lifecycle management and configuration control.