0
# Data Input/Output
1
2
Comprehensive data reading and writing capabilities supporting 10+ formats with optimized cloud storage integration. Daft provides high-performance I/O with intelligent caching, predicate pushdown, and parallel processing.
3
4
## Capabilities
5
6
### Parquet Files
7
8
Read and write Apache Parquet files with columnar optimization and metadata handling.
9
10
```python { .api }
11
def read_parquet(
12
path: Union[str, List[str]],
13
columns: Optional[List[str]] = None,
14
predicate: Optional[Expression] = None,
15
io_config: Optional[IOConfig] = None,
16
**kwargs
17
) -> DataFrame:
18
"""
19
Read Parquet files from local filesystem or cloud storage.
20
21
Parameters:
22
- path: File path(s) or glob pattern
23
- columns: Specific columns to read (all if None)
24
- predicate: Filter predicate for pushdown optimization
25
- io_config: IO configuration for cloud storage
26
27
Returns:
28
DataFrame: DataFrame from Parquet data
29
"""
30
```
31
32
### CSV Files
33
34
Read comma-separated value files with flexible parsing options.
35
36
```python { .api }
37
def read_csv(
38
path: Union[str, List[str]],
39
delimiter: str = ",",
40
has_header: bool = True,
41
column_names: Optional[List[str]] = None,
42
dtype: Optional[Dict[str, DataType]] = None,
43
io_config: Optional[IOConfig] = None,
44
**kwargs
45
) -> DataFrame:
46
"""
47
Read CSV files with customizable parsing.
48
49
Parameters:
50
- path: File path(s) or glob pattern
51
- delimiter: Field separator character
52
- has_header: Whether first row contains column names
53
- column_names: Explicit column names (overrides header)
54
- dtype: Column data type specifications
55
- io_config: IO configuration for cloud storage
56
57
Returns:
58
DataFrame: DataFrame from CSV data
59
"""
60
```
61
62
### JSON Files
63
64
Read JSON and JSONL (newline-delimited JSON) files.
65
66
```python { .api }
67
def read_json(
68
path: Union[str, List[str]],
69
schema: Optional[Schema] = None,
70
io_config: Optional[IOConfig] = None,
71
**kwargs
72
) -> DataFrame:
73
"""
74
Read JSON/JSONL files with schema inference.
75
76
Parameters:
77
- path: File path(s) or glob pattern
78
- schema: Explicit schema (inferred if None)
79
- io_config: IO configuration for cloud storage
80
81
Returns:
82
DataFrame: DataFrame from JSON data
83
"""
84
```
85
86
### Delta Lake
87
88
Read Apache Delta Lake tables with time travel and metadata support.
89
90
```python { .api }
91
def read_deltalake(
92
table_uri: str,
93
version: Optional[int] = None,
94
timestamp: Optional[str] = None,
95
columns: Optional[List[str]] = None,
96
predicate: Optional[Expression] = None,
97
io_config: Optional[IOConfig] = None,
98
**kwargs
99
) -> DataFrame:
100
"""
101
Read Delta Lake tables with time travel capability.
102
103
Parameters:
104
- table_uri: Delta table URI or path
105
- version: Specific table version to read
106
- timestamp: Read table as of timestamp
107
- columns: Specific columns to read
108
- predicate: Filter predicate for optimization
109
- io_config: IO configuration for cloud storage
110
111
Returns:
112
DataFrame: DataFrame from Delta table
113
"""
114
```
115
116
### Apache Iceberg
117
118
Read Apache Iceberg tables with catalog integration.
119
120
```python { .api }
121
def read_iceberg(
122
table: str,
123
columns: Optional[List[str]] = None,
124
predicate: Optional[Expression] = None,
125
snapshot_id: Optional[int] = None,
126
io_config: Optional[IOConfig] = None,
127
**kwargs
128
) -> DataFrame:
129
"""
130
Read Apache Iceberg tables.
131
132
Parameters:
133
- table: Table identifier or path
134
- columns: Specific columns to read
135
- predicate: Filter predicate for optimization
136
- snapshot_id: Specific snapshot to read
137
- io_config: IO configuration
138
139
Returns:
140
DataFrame: DataFrame from Iceberg table
141
"""
142
```
143
144
### Apache Hudi
145
146
Read Apache Hudi tables with incremental processing support.
147
148
```python { .api }
149
def read_hudi(
150
table_uri: str,
151
columns: Optional[List[str]] = None,
152
predicate: Optional[Expression] = None,
153
io_config: Optional[IOConfig] = None,
154
**kwargs
155
) -> DataFrame:
156
"""
157
Read Apache Hudi tables.
158
159
Parameters:
160
- table_uri: Hudi table URI or path
161
- columns: Specific columns to read
162
- predicate: Filter predicate for optimization
163
- io_config: IO configuration
164
165
Returns:
166
DataFrame: DataFrame from Hudi table
167
"""
168
```
169
170
### Lance Columnar Format
171
172
Read Lance columnar format optimized for ML workloads.
173
174
```python { .api }
175
def read_lance(
176
uri: str,
177
columns: Optional[List[str]] = None,
178
predicate: Optional[Expression] = None,
179
io_config: Optional[IOConfig] = None,
180
**kwargs
181
) -> DataFrame:
182
"""
183
Read Lance columnar format.
184
185
Parameters:
186
- uri: Lance dataset URI
187
- columns: Specific columns to read
188
- predicate: Filter predicate
189
- io_config: IO configuration
190
191
Returns:
192
DataFrame: DataFrame from Lance data
193
"""
194
```
195
196
### SQL Databases
197
198
Read data from SQL databases with connection management.
199
200
```python { .api }
201
def read_sql(
202
sql: str,
203
connection_string: str,
204
io_config: Optional[IOConfig] = None,
205
**kwargs
206
) -> DataFrame:
207
"""
208
Read data from SQL databases.
209
210
Parameters:
211
- sql: SQL query to execute
212
- connection_string: Database connection string
213
- io_config: IO configuration
214
215
Returns:
216
DataFrame: DataFrame from SQL query results
217
"""
218
```
219
220
### HuggingFace Datasets
221
222
Read datasets from HuggingFace Hub.
223
224
```python { .api }
225
def read_huggingface(
226
path: str,
227
split: Optional[str] = None,
228
streaming: bool = False,
229
io_config: Optional[IOConfig] = None,
230
**kwargs
231
) -> DataFrame:
232
"""
233
Read HuggingFace datasets.
234
235
Parameters:
236
- path: Dataset name or path on HuggingFace Hub
237
- split: Dataset split to read (train, test, validation)
238
- streaming: Enable streaming mode for large datasets
239
- io_config: IO configuration
240
241
Returns:
242
DataFrame: DataFrame from HuggingFace dataset
243
"""
244
```
245
246
### Video Frames
247
248
Extract frames from video files for computer vision workloads.
249
250
```python { .api }
251
def read_video_frames(
252
path: Union[str, List[str]],
253
sample_rate: Optional[float] = None,
254
frame_count: Optional[int] = None,
255
io_config: Optional[IOConfig] = None,
256
**kwargs
257
) -> DataFrame:
258
"""
259
Read video frames as DataFrame.
260
261
Parameters:
262
- path: Video file path(s)
263
- sample_rate: Frames per second to extract
264
- frame_count: Maximum number of frames
265
- io_config: IO configuration
266
267
Returns:
268
DataFrame: DataFrame with video frame data
269
"""
270
```
271
272
### Web Archives (WARC)
273
274
Read WARC files for web crawling and archival data.
275
276
```python { .api }
277
def read_warc(
278
path: Union[str, List[str]],
279
columns: Optional[List[str]] = None,
280
io_config: Optional[IOConfig] = None,
281
**kwargs
282
) -> DataFrame:
283
"""
284
Read WARC (Web ARChive) files.
285
286
Parameters:
287
- path: WARC file path(s)
288
- columns: Specific columns to extract
289
- io_config: IO configuration
290
291
Returns:
292
DataFrame: DataFrame from WARC data
293
"""
294
```
295
296
### MCAP Robotics Format
297
298
Read MCAP files for robotics and sensor data.
299
300
```python { .api }
301
def read_mcap(
302
path: Union[str, List[str]],
303
topics: Optional[List[str]] = None,
304
io_config: Optional[IOConfig] = None,
305
**kwargs
306
) -> DataFrame:
307
"""
308
Read MCAP robotics format files.
309
310
Parameters:
311
- path: MCAP file path(s)
312
- topics: Specific topics to read
313
- io_config: IO configuration
314
315
Returns:
316
DataFrame: DataFrame from MCAP data
317
"""
318
```
319
320
### Hugging Face Datasets
321
322
Read datasets from Hugging Face Hub.
323
324
```python { .api }
325
def read_huggingface(
326
dataset_name: str,
327
split: Optional[str] = None,
328
subset: Optional[str] = None,
329
**kwargs
330
) -> DataFrame:
331
"""
332
Read dataset from Hugging Face Hub.
333
334
Parameters:
335
- dataset_name: Name of the dataset on Hugging Face Hub
336
- split: Dataset split (train, test, validation)
337
- subset: Dataset subset/configuration name
338
339
Returns:
340
DataFrame: DataFrame from Hugging Face dataset
341
"""
342
```
343
344
### File Path Utilities
345
346
Create DataFrames from file system patterns.
347
348
```python { .api }
349
def from_glob_path(
350
path: str,
351
io_config: Optional[IOConfig] = None
352
) -> DataFrame:
353
"""
354
Create DataFrame from file glob pattern.
355
356
Parameters:
357
- path: Glob pattern for files
358
- io_config: IO configuration
359
360
Returns:
361
DataFrame: DataFrame with file metadata
362
"""
363
364
def range(n: int) -> DataFrame:
365
"""
366
Create DataFrame with range of integers.
367
368
Parameters:
369
- n: Number of integers (0 to n-1)
370
371
Returns:
372
DataFrame: DataFrame with single integer column
373
"""
374
```
375
376
## IO Configuration
377
378
### Core Configuration Classes
379
380
```python { .api }
381
class IOConfig:
382
"""General IO configuration settings."""
383
def __init__(
384
self,
385
s3: Optional[S3Config] = None,
386
azure: Optional[AzureConfig] = None,
387
gcs: Optional[GCSConfig] = None,
388
http: Optional[HTTPConfig] = None
389
): ...
390
391
class S3Config:
392
"""AWS S3 configuration."""
393
def __init__(
394
self,
395
region_name: Optional[str] = None,
396
endpoint_url: Optional[str] = None,
397
credentials: Optional[S3Credentials] = None,
398
use_ssl: bool = True
399
): ...
400
401
class S3Credentials:
402
"""S3 authentication credentials."""
403
def __init__(
404
self,
405
access_key_id: str,
406
secret_access_key: str,
407
session_token: Optional[str] = None
408
): ...
409
410
class AzureConfig:
411
"""Azure Blob Storage configuration."""
412
def __init__(
413
self,
414
storage_account: Optional[str] = None,
415
access_key: Optional[str] = None,
416
sas_token: Optional[str] = None
417
): ...
418
419
class GCSConfig:
420
"""Google Cloud Storage configuration."""
421
def __init__(
422
self,
423
project_id: Optional[str] = None,
424
service_account_key: Optional[str] = None
425
): ...
426
427
class HTTPConfig:
428
"""HTTP client configuration."""
429
def __init__(
430
self,
431
timeout: Optional[int] = None,
432
max_retries: Optional[int] = None,
433
headers: Optional[Dict[str, str]] = None
434
): ...
435
```
436
437
## Usage Examples
438
439
### Cloud Storage Access
440
```python
441
import daft
442
from daft.io import IOConfig, S3Config, S3Credentials
443
444
# Configure S3 access
445
s3_config = S3Config(
446
region_name="us-west-2",
447
credentials=S3Credentials(
448
access_key_id="your-key",
449
secret_access_key="your-secret"
450
)
451
)
452
io_config = IOConfig(s3=s3_config)
453
454
# Read from S3
455
df = daft.read_parquet(
456
"s3://my-bucket/data/*.parquet",
457
io_config=io_config
458
)
459
```
460
461
### Multi-Format Pipeline
462
```python
463
# Read from multiple sources
464
parquet_df = daft.read_parquet("data/raw/*.parquet")
465
csv_df = daft.read_csv("data/supplements.csv")
466
delta_df = daft.read_deltalake("s3://bucket/delta-table")
467
468
# Combine data
469
combined = (parquet_df
470
.union_all(csv_df)
471
.union_all(delta_df)
472
.collect()
473
)
474
```
475
476
### Optimized Reading
477
```python
478
from daft import col
479
480
# Use predicate pushdown for efficiency
481
filtered_df = daft.read_parquet(
482
"s3://large-dataset/*.parquet",
483
columns=["id", "name", "value"],
484
predicate=(col("date") >= "2024-01-01") & (col("status") == "active"),
485
io_config=io_config
486
)
487
```
488
489
### Video Processing
490
```python
491
# Extract video frames for ML
492
video_df = daft.read_video_frames(
493
"videos/*.mp4",
494
sample_rate=1.0, # 1 frame per second
495
frame_count=100 # Max 100 frames per video
496
)
497
498
# Process frames
499
processed = (video_df
500
.select("filename", "frame_index", "image_data")
501
.filter(col("frame_index") % 10 == 0) # Every 10th frame
502
.collect()
503
)
504
```
505
506
## Data Sources and Sinks
507
508
```python { .api }
509
class DataSource:
510
"""Abstract data source interface for custom readers."""
511
512
class DataSourceTask:
513
"""Represents a task for reading data from a source."""
514
515
class DataSink:
516
"""Abstract data sink interface for custom writers."""
517
```