or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ai-ml.mdcatalog.mddata-io.mddataframe-operations.mdexpressions.mdindex.mdsession.mdsql.mdudf.md

data-io.mddocs/

0

# Data Input/Output

1

2

Comprehensive data reading and writing capabilities supporting 10+ formats with optimized cloud storage integration. Daft provides high-performance I/O with intelligent caching, predicate pushdown, and parallel processing.

3

4

## Capabilities

5

6

### Parquet Files

7

8

Read and write Apache Parquet files with columnar optimization and metadata handling.

9

10

```python { .api }

11

def read_parquet(

12

path: Union[str, List[str]],

13

columns: Optional[List[str]] = None,

14

predicate: Optional[Expression] = None,

15

io_config: Optional[IOConfig] = None,

16

**kwargs

17

) -> DataFrame:

18

"""

19

Read Parquet files from local filesystem or cloud storage.

20

21

Parameters:

22

- path: File path(s) or glob pattern

23

- columns: Specific columns to read (all if None)

24

- predicate: Filter predicate for pushdown optimization

25

- io_config: IO configuration for cloud storage

26

27

Returns:

28

DataFrame: DataFrame from Parquet data

29

"""

30

```

31

32

### CSV Files

33

34

Read comma-separated value files with flexible parsing options.

35

36

```python { .api }

37

def read_csv(

38

path: Union[str, List[str]],

39

delimiter: str = ",",

40

has_header: bool = True,

41

column_names: Optional[List[str]] = None,

42

dtype: Optional[Dict[str, DataType]] = None,

43

io_config: Optional[IOConfig] = None,

44

**kwargs

45

) -> DataFrame:

46

"""

47

Read CSV files with customizable parsing.

48

49

Parameters:

50

- path: File path(s) or glob pattern

51

- delimiter: Field separator character

52

- has_header: Whether first row contains column names

53

- column_names: Explicit column names (overrides header)

54

- dtype: Column data type specifications

55

- io_config: IO configuration for cloud storage

56

57

Returns:

58

DataFrame: DataFrame from CSV data

59

"""

60

```

61

62

### JSON Files

63

64

Read JSON and JSONL (newline-delimited JSON) files.

65

66

```python { .api }

67

def read_json(

68

path: Union[str, List[str]],

69

schema: Optional[Schema] = None,

70

io_config: Optional[IOConfig] = None,

71

**kwargs

72

) -> DataFrame:

73

"""

74

Read JSON/JSONL files with schema inference.

75

76

Parameters:

77

- path: File path(s) or glob pattern

78

- schema: Explicit schema (inferred if None)

79

- io_config: IO configuration for cloud storage

80

81

Returns:

82

DataFrame: DataFrame from JSON data

83

"""

84

```

85

86

### Delta Lake

87

88

Read Apache Delta Lake tables with time travel and metadata support.

89

90

```python { .api }

91

def read_deltalake(

92

table_uri: str,

93

version: Optional[int] = None,

94

timestamp: Optional[str] = None,

95

columns: Optional[List[str]] = None,

96

predicate: Optional[Expression] = None,

97

io_config: Optional[IOConfig] = None,

98

**kwargs

99

) -> DataFrame:

100

"""

101

Read Delta Lake tables with time travel capability.

102

103

Parameters:

104

- table_uri: Delta table URI or path

105

- version: Specific table version to read

106

- timestamp: Read table as of timestamp

107

- columns: Specific columns to read

108

- predicate: Filter predicate for optimization

109

- io_config: IO configuration for cloud storage

110

111

Returns:

112

DataFrame: DataFrame from Delta table

113

"""

114

```

115

116

### Apache Iceberg

117

118

Read Apache Iceberg tables with catalog integration.

119

120

```python { .api }

121

def read_iceberg(

122

table: str,

123

columns: Optional[List[str]] = None,

124

predicate: Optional[Expression] = None,

125

snapshot_id: Optional[int] = None,

126

io_config: Optional[IOConfig] = None,

127

**kwargs

128

) -> DataFrame:

129

"""

130

Read Apache Iceberg tables.

131

132

Parameters:

133

- table: Table identifier or path

134

- columns: Specific columns to read

135

- predicate: Filter predicate for optimization

136

- snapshot_id: Specific snapshot to read

137

- io_config: IO configuration

138

139

Returns:

140

DataFrame: DataFrame from Iceberg table

141

"""

142

```

143

144

### Apache Hudi

145

146

Read Apache Hudi tables with incremental processing support.

147

148

```python { .api }

149

def read_hudi(

150

table_uri: str,

151

columns: Optional[List[str]] = None,

152

predicate: Optional[Expression] = None,

153

io_config: Optional[IOConfig] = None,

154

**kwargs

155

) -> DataFrame:

156

"""

157

Read Apache Hudi tables.

158

159

Parameters:

160

- table_uri: Hudi table URI or path

161

- columns: Specific columns to read

162

- predicate: Filter predicate for optimization

163

- io_config: IO configuration

164

165

Returns:

166

DataFrame: DataFrame from Hudi table

167

"""

168

```

169

170

### Lance Columnar Format

171

172

Read Lance columnar format optimized for ML workloads.

173

174

```python { .api }

175

def read_lance(

176

uri: str,

177

columns: Optional[List[str]] = None,

178

predicate: Optional[Expression] = None,

179

io_config: Optional[IOConfig] = None,

180

**kwargs

181

) -> DataFrame:

182

"""

183

Read Lance columnar format.

184

185

Parameters:

186

- uri: Lance dataset URI

187

- columns: Specific columns to read

188

- predicate: Filter predicate

189

- io_config: IO configuration

190

191

Returns:

192

DataFrame: DataFrame from Lance data

193

"""

194

```

195

196

### SQL Databases

197

198

Read data from SQL databases with connection management.

199

200

```python { .api }

201

def read_sql(

202

sql: str,

203

connection_string: str,

204

io_config: Optional[IOConfig] = None,

205

**kwargs

206

) -> DataFrame:

207

"""

208

Read data from SQL databases.

209

210

Parameters:

211

- sql: SQL query to execute

212

- connection_string: Database connection string

213

- io_config: IO configuration

214

215

Returns:

216

DataFrame: DataFrame from SQL query results

217

"""

218

```

219

220

### HuggingFace Datasets

221

222

Read datasets from HuggingFace Hub.

223

224

```python { .api }

225

def read_huggingface(

226

path: str,

227

split: Optional[str] = None,

228

streaming: bool = False,

229

io_config: Optional[IOConfig] = None,

230

**kwargs

231

) -> DataFrame:

232

"""

233

Read HuggingFace datasets.

234

235

Parameters:

236

- path: Dataset name or path on HuggingFace Hub

237

- split: Dataset split to read (train, test, validation)

238

- streaming: Enable streaming mode for large datasets

239

- io_config: IO configuration

240

241

Returns:

242

DataFrame: DataFrame from HuggingFace dataset

243

"""

244

```

245

246

### Video Frames

247

248

Extract frames from video files for computer vision workloads.

249

250

```python { .api }

251

def read_video_frames(

252

path: Union[str, List[str]],

253

sample_rate: Optional[float] = None,

254

frame_count: Optional[int] = None,

255

io_config: Optional[IOConfig] = None,

256

**kwargs

257

) -> DataFrame:

258

"""

259

Read video frames as DataFrame.

260

261

Parameters:

262

- path: Video file path(s)

263

- sample_rate: Frames per second to extract

264

- frame_count: Maximum number of frames

265

- io_config: IO configuration

266

267

Returns:

268

DataFrame: DataFrame with video frame data

269

"""

270

```

271

272

### Web Archives (WARC)

273

274

Read WARC files for web crawling and archival data.

275

276

```python { .api }

277

def read_warc(

278

path: Union[str, List[str]],

279

columns: Optional[List[str]] = None,

280

io_config: Optional[IOConfig] = None,

281

**kwargs

282

) -> DataFrame:

283

"""

284

Read WARC (Web ARChive) files.

285

286

Parameters:

287

- path: WARC file path(s)

288

- columns: Specific columns to extract

289

- io_config: IO configuration

290

291

Returns:

292

DataFrame: DataFrame from WARC data

293

"""

294

```

295

296

### MCAP Robotics Format

297

298

Read MCAP files for robotics and sensor data.

299

300

```python { .api }

301

def read_mcap(

302

path: Union[str, List[str]],

303

topics: Optional[List[str]] = None,

304

io_config: Optional[IOConfig] = None,

305

**kwargs

306

) -> DataFrame:

307

"""

308

Read MCAP robotics format files.

309

310

Parameters:

311

- path: MCAP file path(s)

312

- topics: Specific topics to read

313

- io_config: IO configuration

314

315

Returns:

316

DataFrame: DataFrame from MCAP data

317

"""

318

```

319

320

### Hugging Face Datasets

321

322

Read datasets from Hugging Face Hub.

323

324

```python { .api }

325

def read_huggingface(

326

dataset_name: str,

327

split: Optional[str] = None,

328

subset: Optional[str] = None,

329

**kwargs

330

) -> DataFrame:

331

"""

332

Read dataset from Hugging Face Hub.

333

334

Parameters:

335

- dataset_name: Name of the dataset on Hugging Face Hub

336

- split: Dataset split (train, test, validation)

337

- subset: Dataset subset/configuration name

338

339

Returns:

340

DataFrame: DataFrame from Hugging Face dataset

341

"""

342

```

343

344

### File Path Utilities

345

346

Create DataFrames from file system patterns.

347

348

```python { .api }

349

def from_glob_path(

350

path: str,

351

io_config: Optional[IOConfig] = None

352

) -> DataFrame:

353

"""

354

Create DataFrame from file glob pattern.

355

356

Parameters:

357

- path: Glob pattern for files

358

- io_config: IO configuration

359

360

Returns:

361

DataFrame: DataFrame with file metadata

362

"""

363

364

def range(n: int) -> DataFrame:

365

"""

366

Create DataFrame with range of integers.

367

368

Parameters:

369

- n: Number of integers (0 to n-1)

370

371

Returns:

372

DataFrame: DataFrame with single integer column

373

"""

374

```

375

376

## IO Configuration

377

378

### Core Configuration Classes

379

380

```python { .api }

381

class IOConfig:

382

"""General IO configuration settings."""

383

def __init__(

384

self,

385

s3: Optional[S3Config] = None,

386

azure: Optional[AzureConfig] = None,

387

gcs: Optional[GCSConfig] = None,

388

http: Optional[HTTPConfig] = None

389

): ...

390

391

class S3Config:

392

"""AWS S3 configuration."""

393

def __init__(

394

self,

395

region_name: Optional[str] = None,

396

endpoint_url: Optional[str] = None,

397

credentials: Optional[S3Credentials] = None,

398

use_ssl: bool = True

399

): ...

400

401

class S3Credentials:

402

"""S3 authentication credentials."""

403

def __init__(

404

self,

405

access_key_id: str,

406

secret_access_key: str,

407

session_token: Optional[str] = None

408

): ...

409

410

class AzureConfig:

411

"""Azure Blob Storage configuration."""

412

def __init__(

413

self,

414

storage_account: Optional[str] = None,

415

access_key: Optional[str] = None,

416

sas_token: Optional[str] = None

417

): ...

418

419

class GCSConfig:

420

"""Google Cloud Storage configuration."""

421

def __init__(

422

self,

423

project_id: Optional[str] = None,

424

service_account_key: Optional[str] = None

425

): ...

426

427

class HTTPConfig:

428

"""HTTP client configuration."""

429

def __init__(

430

self,

431

timeout: Optional[int] = None,

432

max_retries: Optional[int] = None,

433

headers: Optional[Dict[str, str]] = None

434

): ...

435

```

436

437

## Usage Examples

438

439

### Cloud Storage Access

440

```python

441

import daft

442

from daft.io import IOConfig, S3Config, S3Credentials

443

444

# Configure S3 access

445

s3_config = S3Config(

446

region_name="us-west-2",

447

credentials=S3Credentials(

448

access_key_id="your-key",

449

secret_access_key="your-secret"

450

)

451

)

452

io_config = IOConfig(s3=s3_config)

453

454

# Read from S3

455

df = daft.read_parquet(

456

"s3://my-bucket/data/*.parquet",

457

io_config=io_config

458

)

459

```

460

461

### Multi-Format Pipeline

462

```python

463

# Read from multiple sources

464

parquet_df = daft.read_parquet("data/raw/*.parquet")

465

csv_df = daft.read_csv("data/supplements.csv")

466

delta_df = daft.read_deltalake("s3://bucket/delta-table")

467

468

# Combine data

469

combined = (parquet_df

470

.union_all(csv_df)

471

.union_all(delta_df)

472

.collect()

473

)

474

```

475

476

### Optimized Reading

477

```python

478

from daft import col

479

480

# Use predicate pushdown for efficiency

481

filtered_df = daft.read_parquet(

482

"s3://large-dataset/*.parquet",

483

columns=["id", "name", "value"],

484

predicate=(col("date") >= "2024-01-01") & (col("status") == "active"),

485

io_config=io_config

486

)

487

```

488

489

### Video Processing

490

```python

491

# Extract video frames for ML

492

video_df = daft.read_video_frames(

493

"videos/*.mp4",

494

sample_rate=1.0, # 1 frame per second

495

frame_count=100 # Max 100 frames per video

496

)

497

498

# Process frames

499

processed = (video_df

500

.select("filename", "frame_index", "image_data")

501

.filter(col("frame_index") % 10 == 0) # Every 10th frame

502

.collect()

503

)

504

```

505

506

## Data Sources and Sinks

507

508

```python { .api }

509

class DataSource:

510

"""Abstract data source interface for custom readers."""

511

512

class DataSourceTask:

513

"""Represents a task for reading data from a source."""

514

515

class DataSink:

516

"""Abstract data sink interface for custom writers."""

517

```