or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ai-ml.mdcatalog.mddata-io.mddataframe-operations.mdexpressions.mdindex.mdsession.mdsql.mdudf.md

session.mddocs/

0

# Session Management

1

2

Session-based configuration and resource management for distributed computing. Handles catalog connections, temporary tables, execution settings, and provides a unified context for all Daft operations.

3

4

## Capabilities

5

6

### Session Interface

7

8

Core session management for distributed DataFrame operations.

9

10

```python { .api }

11

class Session:

12

"""Main session class for distributed computing configuration."""

13

14

def __init__(self): ...

15

16

def get_catalog(self, name: str) -> Catalog:

17

"""Get attached catalog by name."""

18

19

def list_catalogs(self) -> List[str]:

20

"""List all attached catalogs."""

21

22

def attach_catalog(self, name: str, catalog: Catalog) -> None:

23

"""Attach catalog to session."""

24

25

def detach_catalog(self, name: str) -> None:

26

"""Detach catalog from session."""

27

28

def current_session() -> Session:

29

"""

30

Get current session instance.

31

32

Returns:

33

Session: Current active session

34

"""

35

36

def set_session(session: Session) -> None:

37

"""

38

Set current session.

39

40

Parameters:

41

- session: Session instance to make current

42

"""

43

44

def session() -> Session:

45

"""

46

Get or create session instance.

47

48

Returns:

49

Session: Session instance (creates if none exists)

50

"""

51

```

52

53

### Catalog Management

54

55

Attach and manage data catalogs within sessions.

56

57

```python { .api }

58

def attach_catalog(name: str, catalog: Catalog) -> None:

59

"""

60

Attach catalog to current session.

61

62

Parameters:

63

- name: Catalog name for reference

64

- catalog: Catalog instance to attach

65

"""

66

67

def detach_catalog(name: str) -> None:

68

"""

69

Detach catalog from current session.

70

71

Parameters:

72

- name: Name of catalog to detach

73

"""

74

75

def current_catalog() -> str:

76

"""

77

Get current catalog name.

78

79

Returns:

80

str: Name of current catalog

81

"""

82

83

def set_catalog(name: str) -> None:

84

"""

85

Set current catalog.

86

87

Parameters:

88

- name: Name of catalog to make current

89

"""

90

91

def get_catalog(name: str) -> Catalog:

92

"""

93

Get catalog by name.

94

95

Parameters:

96

- name: Catalog name

97

98

Returns:

99

Catalog: Catalog instance

100

"""

101

102

def has_catalog(name: str) -> bool:

103

"""

104

Check if catalog exists in session.

105

106

Parameters:

107

- name: Catalog name to check

108

109

Returns:

110

bool: True if catalog exists

111

"""

112

113

def list_catalogs() -> List[str]:

114

"""

115

List all attached catalogs.

116

117

Returns:

118

List[str]: List of catalog names

119

"""

120

```

121

122

### Namespace Management

123

124

Manage catalog namespaces within the session context.

125

126

```python { .api }

127

def create_namespace(name: str) -> None:

128

"""

129

Create namespace in current catalog.

130

131

Parameters:

132

- name: Namespace name to create

133

"""

134

135

def create_namespace_if_not_exists(name: str) -> None:

136

"""

137

Create namespace if it doesn't exist.

138

139

Parameters:

140

- name: Namespace name

141

"""

142

143

def drop_namespace(name: str) -> None:

144

"""

145

Drop namespace from current catalog.

146

147

Parameters:

148

- name: Namespace name to drop

149

"""

150

151

def current_namespace() -> str:

152

"""

153

Get current namespace name.

154

155

Returns:

156

str: Current namespace name

157

"""

158

159

def set_namespace(name: str) -> None:

160

"""

161

Set current namespace.

162

163

Parameters:

164

- name: Namespace name to set as current

165

"""

166

167

def has_namespace(name: str) -> bool:

168

"""

169

Check if namespace exists.

170

171

Parameters:

172

- name: Namespace name to check

173

174

Returns:

175

bool: True if namespace exists

176

"""

177

```

178

179

### Table Management

180

181

Register and manage temporary tables and catalog tables.

182

183

```python { .api }

184

def attach_table(df: DataFrame, name: str) -> None:

185

"""

186

Attach DataFrame as temporary table.

187

188

Parameters:

189

- df: DataFrame to attach

190

- name: Table name for reference

191

"""

192

193

def detach_table(name: str) -> None:

194

"""

195

Detach temporary table.

196

197

Parameters:

198

- name: Table name to detach

199

"""

200

201

def create_table(name: str, source: Union[Schema, DataFrame]) -> Table:

202

"""

203

Create table in current catalog.

204

205

Parameters:

206

- name: Table name

207

- source: Schema or DataFrame to create table from

208

209

Returns:

210

Table: Created table instance

211

"""

212

213

def create_table_if_not_exists(name: str, source: Union[Schema, DataFrame]) -> Table:

214

"""

215

Create table if it doesn't exist.

216

217

Parameters:

218

- name: Table name

219

- source: Schema or DataFrame

220

221

Returns:

222

Table: Table instance (existing or newly created)

223

"""

224

225

def create_temp_table(name: str, df: DataFrame) -> None:

226

"""

227

Create temporary table from DataFrame.

228

229

Parameters:

230

- name: Temporary table name

231

- df: DataFrame to use as table data

232

"""

233

234

def drop_table(name: str) -> None:

235

"""

236

Drop table from current catalog.

237

238

Parameters:

239

- name: Table name to drop

240

"""

241

242

def get_table(name: str) -> Table:

243

"""

244

Get table by name.

245

246

Parameters:

247

- name: Table name

248

249

Returns:

250

Table: Table instance

251

"""

252

253

def has_table(name: str) -> bool:

254

"""

255

Check if table exists.

256

257

Parameters:

258

- name: Table name to check

259

260

Returns:

261

bool: True if table exists

262

"""

263

264

def list_tables() -> List[str]:

265

"""

266

List all available tables.

267

268

Returns:

269

List[str]: List of table names

270

"""

271

272

def read_table(name: str, **options: Any) -> DataFrame:

273

"""

274

Read table as DataFrame.

275

276

Parameters:

277

- name: Table name to read

278

- options: Additional read options

279

280

Returns:

281

DataFrame: Table data as DataFrame

282

"""

283

284

def write_table(name: str, df: DataFrame, **options: Any) -> None:

285

"""

286

Write DataFrame to table.

287

288

Parameters:

289

- name: Table name

290

- df: DataFrame to write

291

- options: Additional write options

292

"""

293

```

294

295

### Provider Management

296

297

Manage data providers and external service connections.

298

299

```python { .api }

300

def attach_provider(name: str, provider: Any) -> None:

301

"""

302

Attach data provider to session.

303

304

Parameters:

305

- name: Provider name

306

- provider: Provider instance

307

"""

308

309

def detach_provider(name: str) -> None:

310

"""

311

Detach data provider.

312

313

Parameters:

314

- name: Provider name to detach

315

"""

316

317

def current_provider() -> str:

318

"""

319

Get current provider name.

320

321

Returns:

322

str: Current provider name

323

"""

324

325

def set_provider(name: str) -> None:

326

"""

327

Set current provider.

328

329

Parameters:

330

- name: Provider name to set as current

331

"""

332

333

def get_provider(name: str) -> Any:

334

"""

335

Get provider by name.

336

337

Parameters:

338

- name: Provider name

339

340

Returns:

341

Any: Provider instance

342

"""

343

344

def has_provider(name: str) -> bool:

345

"""

346

Check if provider exists.

347

348

Parameters:

349

- name: Provider name to check

350

351

Returns:

352

bool: True if provider exists

353

"""

354

```

355

356

### Function Management

357

358

Register custom functions for use across the session.

359

360

```python { .api }

361

def attach_function(name: str, func: Callable) -> None:

362

"""

363

Attach function to session for global use.

364

365

Parameters:

366

- name: Function name for reference

367

- func: Callable function to attach

368

"""

369

370

def detach_function(name: str) -> None:

371

"""

372

Detach function from session.

373

374

Parameters:

375

- name: Function name to detach

376

"""

377

```

378

379

### Model Management

380

381

Manage AI/ML models within the session context.

382

383

```python { .api }

384

def current_model() -> str:

385

"""

386

Get current model name.

387

388

Returns:

389

str: Current model name

390

"""

391

392

def set_model(name: str) -> None:

393

"""

394

Set current model for AI operations.

395

396

Parameters:

397

- name: Model name to set as current

398

"""

399

```

400

401

### Configuration Management

402

403

Configure execution and planning settings for the session.

404

405

```python { .api }

406

def set_execution_config(config: ExecutionConfig) -> None:

407

"""

408

Set execution configuration for the session.

409

410

Parameters:

411

- config: Execution configuration settings

412

"""

413

414

def set_planning_config(config: PlanningConfig) -> None:

415

"""

416

Set query planning configuration.

417

418

Parameters:

419

- config: Planning configuration settings

420

"""

421

422

def execution_config_ctx(config: ExecutionConfig) -> ContextManager:

423

"""

424

Context manager for temporary execution config.

425

426

Parameters:

427

- config: Temporary execution configuration

428

429

Returns:

430

ContextManager: Context manager for config scope

431

"""

432

433

def planning_config_ctx(config: PlanningConfig) -> ContextManager:

434

"""

435

Context manager for temporary planning config.

436

437

Parameters:

438

- config: Temporary planning configuration

439

440

Returns:

441

ContextManager: Context manager for config scope

442

"""

443

```

444

445

### General Attachment

446

447

Generic attachment mechanism for session objects.

448

449

```python { .api }

450

def attach(obj: Any, name: str) -> None:

451

"""

452

Attach generic object to session.

453

454

Parameters:

455

- obj: Object to attach

456

- name: Name for reference

457

"""

458

```

459

460

## Usage Examples

461

462

### Basic Session Setup

463

```python

464

import daft

465

from daft.catalog import Catalog

466

467

# Create or get current session

468

session = daft.current_session()

469

470

# Create catalogs

471

sales_catalog = Catalog.from_pydict({

472

"customers": {"id": [1, 2, 3], "name": ["A", "B", "C"]},

473

"orders": {"order_id": [101, 102], "customer_id": [1, 2]}

474

})

475

476

inventory_catalog = Catalog.from_pydict({

477

"products": {"id": [1, 2], "name": ["Widget", "Gadget"]}

478

})

479

480

# Attach catalogs to session

481

daft.attach_catalog("sales", sales_catalog)

482

daft.attach_catalog("inventory", inventory_catalog)

483

484

# List available catalogs

485

print(f"Available catalogs: {daft.list_catalogs()}")

486

```

487

488

### Working with Multiple Catalogs

489

```python

490

# Set current catalog

491

daft.set_catalog("sales")

492

493

# Work with tables in current catalog

494

customers_df = daft.read_table("customers")

495

orders_df = daft.read_table("orders")

496

497

# Switch to different catalog

498

daft.set_catalog("inventory")

499

products_df = daft.read_table("products")

500

501

# Join data across catalogs

502

result = customers_df.join(

503

orders_df,

504

on=daft.col("id") == daft.col("customer_id")

505

).join(

506

products_df.rename({"id": "product_id"}),

507

on=daft.col("product_id") == daft.col("product_id")

508

)

509

```

510

511

### Temporary Tables and SQL

512

```python

513

# Create temporary table

514

temp_data = daft.from_pydict({

515

"region": ["North", "South", "East", "West"],

516

"sales": [1000, 1500, 1200, 800]

517

})

518

519

daft.attach_table(temp_data, "regional_sales")

520

521

# Use in SQL queries

522

sql_result = daft.sql("""

523

SELECT r.region, r.sales, c.name

524

FROM regional_sales r

525

JOIN sales.customers c ON c.id <= 2

526

ORDER BY r.sales DESC

527

""")

528

529

# Clean up temporary table

530

daft.detach_table("regional_sales")

531

```

532

533

### Session Configuration

534

```python

535

from daft.context import ExecutionConfig, PlanningConfig

536

537

# Configure execution settings

538

exec_config = ExecutionConfig(

539

default_morsel_size=1000000,

540

num_scan_tasks=16

541

)

542

daft.set_execution_config(exec_config)

543

544

# Configure planning settings

545

plan_config = PlanningConfig(

546

broadcast_join_size_threshold=100000

547

)

548

daft.set_planning_config(plan_config)

549

550

# Use temporary configuration

551

with daft.execution_config_ctx(ExecutionConfig(num_scan_tasks=32)):

552

# This operation uses 32 scan tasks

553

large_df = daft.read_parquet("s3://bucket/large-dataset/*.parquet")

554

result = large_df.groupby("category").count().collect()

555

```

556

557

### Function Registration

558

```python

559

# Define custom function

560

@daft.func

561

def custom_transform(value: str) -> str:

562

return value.upper() + "_PROCESSED"

563

564

# Register function globally

565

daft.attach_function("global_transform", custom_transform)

566

567

# Use registered function in SQL

568

sql_with_udf = daft.sql("""

569

SELECT name, global_transform(name) as processed_name

570

FROM sales.customers

571

""")

572

573

# Detach when no longer needed

574

daft.detach_function("global_transform")

575

```

576

577

### Multi-Environment Session Management

578

```python

579

def setup_development_session():

580

"""Setup session for development environment."""

581

# Development catalogs with sample data

582

dev_catalog = Catalog.from_pydict({

583

"users": {"id": [1, 2], "name": ["Dev User 1", "Dev User 2"]}

584

})

585

586

daft.attach_catalog("main", dev_catalog)

587

daft.set_catalog("main")

588

589

# Development-specific configuration

590

daft.set_execution_config(ExecutionConfig(

591

default_morsel_size=10000 # Smaller for dev

592

))

593

594

def setup_production_session():

595

"""Setup session for production environment."""

596

# Production catalogs from external systems

597

from pyiceberg.catalog import load_catalog

598

599

prod_iceberg = load_catalog("prod_catalog")

600

prod_catalog = Catalog.from_iceberg(prod_iceberg)

601

602

daft.attach_catalog("main", prod_catalog)

603

daft.set_catalog("main")

604

605

# Production-optimized configuration

606

daft.set_execution_config(ExecutionConfig(

607

default_morsel_size=10000000 # Larger for production

608

))

609

610

# Environment-specific setup

611

import os

612

if os.getenv("ENVIRONMENT") == "production":

613

setup_production_session()

614

else:

615

setup_development_session()

616

```

617

618

### Advanced Session Management

619

```python

620

class DataPipeline:

621

def __init__(self):

622

self.session = daft.session()

623

self.setup_catalogs()

624

self.register_functions()

625

626

def setup_catalogs(self):

627

"""Setup all required catalogs."""

628

# Raw data catalog

629

raw_catalog = Catalog.from_s3tables("arn:aws:s3:::raw-data-bucket")

630

daft.attach_catalog("raw", raw_catalog)

631

632

# Processed data catalog

633

processed_catalog = Catalog.from_unity(unity_client)

634

daft.attach_catalog("processed", processed_catalog)

635

636

# Set default catalog

637

daft.set_catalog("processed")

638

639

def register_functions(self):

640

"""Register pipeline-specific functions."""

641

@daft.func

642

def clean_text(text: str) -> str:

643

return text.strip().lower()

644

645

@daft.func

646

def validate_email(email: str) -> bool:

647

return "@" in email and "." in email

648

649

daft.attach_function("clean_text", clean_text)

650

daft.attach_function("validate_email", validate_email)

651

652

def run_pipeline(self):

653

"""Execute data pipeline."""

654

# Read from raw data

655

daft.set_catalog("raw")

656

raw_df = daft.read_table("user_data")

657

658

# Process data

659

cleaned_df = raw_df.select(

660

daft.col("id"),

661

daft.sql_expr("clean_text(name)").alias("name"),

662

daft.col("email")

663

).filter(

664

daft.sql_expr("validate_email(email)")

665

)

666

667

# Write to processed catalog

668

daft.set_catalog("processed")

669

daft.write_table("clean_users", cleaned_df)

670

671

def cleanup(self):

672

"""Clean up session resources."""

673

daft.detach_function("clean_text")

674

daft.detach_function("validate_email")

675

daft.detach_catalog("raw")

676

daft.detach_catalog("processed")

677

678

# Use pipeline

679

pipeline = DataPipeline()

680

try:

681

pipeline.run_pipeline()

682

finally:

683

pipeline.cleanup()

684

```

685

686

### Session State Inspection

687

```python

688

def inspect_session():

689

"""Inspect current session state."""

690

print(f"Current session: {daft.current_session()}")

691

print(f"Current catalog: {daft.current_catalog()}")

692

print(f"Current namespace: {daft.current_namespace()}")

693

print(f"Available catalogs: {daft.list_catalogs()}")

694

print(f"Available tables: {daft.list_tables()}")

695

696

# Check specific resources

697

if daft.has_catalog("main"):

698

print("Main catalog is available")

699

700

if daft.has_table("users"):

701

print("Users table is available")

702

703

# Inspect current state

704

inspect_session()

705

```

706

707

### Error Handling and Recovery

708

```python

709

def safe_session_operation():

710

"""Perform session operations with error handling."""

711

try:

712

# Setup catalogs

713

catalog = Catalog.from_iceberg(iceberg_instance)

714

daft.attach_catalog("main", catalog)

715

716

# Perform operations

717

df = daft.read_table("main.sales.transactions")

718

result = df.groupby("region").sum("amount").collect()

719

720

return result

721

722

except Exception as e:

723

print(f"Session operation failed: {e}")

724

725

# Cleanup on error

726

if daft.has_catalog("main"):

727

daft.detach_catalog("main")

728

729

raise

730

731

finally:

732

# Ensure cleanup

733

print("Session operation completed")

734

735

# Safe execution

736

try:

737

result = safe_session_operation()

738

except Exception:

739

print("Operation failed, session cleaned up")

740

```

741

742

Session management in Daft provides a comprehensive framework for organizing and coordinating distributed data operations across multiple catalogs, tables, and computational resources with proper lifecycle management and configuration control.