0
# Dataset Operations
1
2
Multi-file dataset interface supporting partitioned data, lazy evaluation, and distributed processing. Enables efficient querying of large datasets stored across multiple files with automatic partition discovery, predicate pushdown, and columnar processing.
3
4
## Capabilities
5
6
### Dataset Creation and Management
7
8
Create and manage datasets from various sources including local files, cloud storage, and in-memory data.
9
10
```python { .api }
11
def dataset(source, schema=None, format=None, filesystem=None, partitioning=None, partition_base_dir=None, exclude_invalid_files=None, ignore_prefixes=None):
12
"""
13
Create dataset from various sources.
14
15
Parameters:
16
- source: str, list, or Table - data source(s)
17
- schema: Schema, explicit schema
18
- format: FileFormat, file format
19
- filesystem: FileSystem, filesystem to use
20
- partitioning: Partitioning, partitioning scheme
21
- partition_base_dir: str, base directory for partitioning
22
- exclude_invalid_files: bool, exclude invalid files
23
- ignore_prefixes: list, prefixes to ignore
24
25
Returns:
26
Dataset: Dataset object for querying
27
"""
28
29
def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, partitioning_flavor=None, schema=None, filesystem=None, file_options=None, use_threads=True, max_partitions=None, max_open_files=None, max_rows_per_file=None, min_rows_per_group=None, max_rows_per_group=None, existing_data_behavior='error'):
30
"""
31
Write dataset to storage with partitioning.
32
33
Parameters:
34
- data: Table, dataset, or iterable of batches
35
- base_dir: str, base directory for output
36
- basename_template: str, template for file names
37
- format: FileFormat, output format
38
- partitioning: Partitioning, partitioning scheme
39
- partitioning_flavor: str, partitioning flavor ('hive', 'directory')
40
- schema: Schema, output schema
41
- filesystem: FileSystem, filesystem to use
42
- file_options: FileWriteOptions, format-specific options
43
- use_threads: bool, use multiple threads
44
- max_partitions: int, maximum number of partitions
45
- max_open_files: int, maximum open files
46
- max_rows_per_file: int, maximum rows per file
47
- min_rows_per_group: int, minimum rows per group
48
- max_rows_per_group: int, maximum rows per group
49
- existing_data_behavior: str, behavior for existing data
50
"""
51
52
class Dataset:
53
"""
54
Abstract dataset interface for reading tabular data.
55
56
Attributes:
57
- schema: Schema of the dataset
58
- partition_expression: Partition expression
59
"""
60
61
def count_rows(self, **kwargs):
62
"""Count total rows in dataset."""
63
64
def get_fragments(self, filter=None):
65
"""Get dataset fragments."""
66
67
def head(self, num_rows, **kwargs):
68
"""Get first num_rows as Table."""
69
70
def replace_schema(self, schema):
71
"""Return dataset with new schema."""
72
73
def scanner(self, **kwargs):
74
"""Create scanner for dataset."""
75
76
def take(self, indices, **kwargs):
77
"""Select rows by indices."""
78
79
def to_batches(self, **kwargs):
80
"""Convert to iterator of record batches."""
81
82
def to_table(self, **kwargs):
83
"""Convert entire dataset to Table."""
84
85
class FileSystemDataset(Dataset):
86
"""
87
Dataset backed by files in filesystem.
88
89
Attributes:
90
- files: List of dataset files
91
- filesystem: Filesystem object
92
- format: File format
93
- partitioning: Partitioning scheme
94
"""
95
96
class InMemoryDataset(Dataset):
97
"""Dataset backed by in-memory tables."""
98
99
class UnionDataset(Dataset):
100
"""Union of multiple datasets."""
101
```
102
103
### Dataset Scanning and Querying
104
105
Efficient scanning with projection, filtering, and lazy evaluation for large-scale data processing.
106
107
```python { .api }
108
class Scanner:
109
"""
110
Dataset scanner with filtering and projection.
111
112
Attributes:
113
- projected_schema: Projected schema
114
- filter: Applied filter expression
115
"""
116
117
def count_rows(self):
118
"""Count rows matching scanner criteria."""
119
120
def head(self, num_rows):
121
"""Get first num_rows as Table."""
122
123
def take(self, indices):
124
"""Select rows by indices."""
125
126
def to_batches(self):
127
"""Scan to iterator of record batches."""
128
129
def to_reader(self):
130
"""Convert to RecordBatchReader."""
131
132
def to_table(self):
133
"""Scan entire dataset to Table."""
134
135
def scan(self):
136
"""Perform scan operation."""
137
138
class TaggedRecordBatch:
139
"""Record batch with partition information."""
140
141
@property
142
def record_batch(self):
143
"""Get record batch."""
144
145
@property
146
def fragment(self):
147
"""Get source fragment."""
148
```
149
150
### Partitioning
151
152
Automatic partition discovery and custom partitioning schemes for organizing large datasets.
153
154
```python { .api }
155
def partitioning(schema=None, field_names=None, flavor=None):
156
"""
157
Create partitioning scheme.
158
159
Parameters:
160
- schema: Schema, partitioning schema
161
- field_names: list, partition field names
162
- flavor: str, partitioning flavor ('hive', 'directory')
163
164
Returns:
165
Partitioning: Partitioning scheme
166
"""
167
168
def get_partition_keys(path, partitioning):
169
"""
170
Extract partition keys from path.
171
172
Parameters:
173
- path: str, file path
174
- partitioning: Partitioning, partitioning scheme
175
176
Returns:
177
dict: Partition key-value pairs
178
"""
179
180
class Partitioning:
181
"""
182
Abstract partitioning scheme.
183
184
Attributes:
185
- schema: Partitioning schema
186
"""
187
188
def parse(self, path):
189
"""Parse partition keys from path."""
190
191
def format(self, partition_keys):
192
"""Format partition keys as path."""
193
194
class PartitioningFactory:
195
"""Factory for creating partitioning schemes."""
196
197
def create(self, field_names):
198
"""Create partitioning from field names."""
199
200
def inspect(self, paths):
201
"""Inspect paths to infer partitioning."""
202
203
class DirectoryPartitioning(Partitioning):
204
"""
205
Directory-based partitioning (e.g., year=2021/month=01/).
206
"""
207
208
class HivePartitioning(Partitioning):
209
"""
210
Hive-style partitioning with key=value directories.
211
"""
212
213
class FilenamePartitioning(Partitioning):
214
"""
215
Partitioning based on filenames.
216
"""
217
```
218
219
### File Formats and Fragments
220
221
Support for multiple file formats with format-specific optimization and fragment-based processing.
222
223
```python { .api }
224
class FileFormat:
225
"""Abstract file format interface."""
226
227
def equals(self, other):
228
"""Check format equality."""
229
230
def get_type_name(self):
231
"""Get format type name."""
232
233
def make_fragment(self, file, filesystem=None, partition_expression=None):
234
"""Create fragment from file."""
235
236
class CsvFileFormat(FileFormat):
237
"""CSV file format implementation."""
238
239
def __init__(self, parse_options=None, read_options=None, convert_options=None): ...
240
241
class JsonFileFormat(FileFormat):
242
"""JSON file format implementation."""
243
244
def __init__(self, parse_options=None, read_options=None): ...
245
246
class IpcFileFormat(FileFormat):
247
"""Arrow IPC file format implementation."""
248
249
class FeatherFileFormat(FileFormat):
250
"""Feather file format implementation."""
251
252
class ParquetFileFormat(FileFormat):
253
"""
254
Parquet file format implementation.
255
256
Attributes:
257
- read_options: Parquet read options
258
"""
259
260
def __init__(self, read_options=None, **kwargs): ...
261
262
class OrcFileFormat(FileFormat):
263
"""ORC file format implementation."""
264
265
class Fragment:
266
"""
267
Abstract fragment representing part of dataset.
268
269
Attributes:
270
- format: File format
271
- partition_expression: Partition expression
272
"""
273
274
def count_rows(self, **kwargs):
275
"""Count rows in fragment."""
276
277
def head(self, num_rows, **kwargs):
278
"""Get first num_rows from fragment."""
279
280
def scanner(self, **kwargs):
281
"""Create scanner for fragment."""
282
283
def to_batches(self, **kwargs):
284
"""Convert fragment to record batches."""
285
286
def to_table(self, **kwargs):
287
"""Convert fragment to table."""
288
289
class FileFragment(Fragment):
290
"""
291
Fragment backed by single file.
292
293
Attributes:
294
- path: File path
295
- filesystem: Filesystem
296
"""
297
298
class ParquetFileFragment(FileFragment):
299
"""
300
Parquet-specific file fragment.
301
302
Attributes:
303
- row_groups: Row group information
304
"""
305
306
def get_row_group_fragments(self):
307
"""Get row group fragments."""
308
309
def split_by_row_group(self, **kwargs):
310
"""Split fragment by row groups."""
311
```
312
313
### Dataset Factories
314
315
Factory classes for creating datasets with automatic discovery and configuration.
316
317
```python { .api }
318
class DatasetFactory:
319
"""Abstract dataset factory."""
320
321
def finish(self, **kwargs):
322
"""Create dataset from factory."""
323
324
def inspect(self, **kwargs):
325
"""Inspect data sources."""
326
327
def inspect_schemas(self, **kwargs):
328
"""Inspect schemas from sources."""
329
330
class FileSystemDatasetFactory(DatasetFactory):
331
"""
332
Factory for filesystem-based datasets.
333
"""
334
335
def __init__(self, filesystem, paths_or_selector, format=None, options=None): ...
336
337
class UnionDatasetFactory(DatasetFactory):
338
"""Factory for union datasets."""
339
340
class ParquetDatasetFactory(DatasetFactory):
341
"""
342
Specialized factory for Parquet datasets.
343
"""
344
345
def __init__(self, metadata_path, schema=None, **kwargs): ...
346
```
347
348
### Write Support
349
350
Configuration and options for writing datasets with various formats and partitioning schemes.
351
352
```python { .api }
353
class FileWriteOptions:
354
"""Base file writing options."""
355
356
class IpcFileWriteOptions(FileWriteOptions):
357
"""IPC file writing options."""
358
359
class ParquetFileWriteOptions(FileWriteOptions):
360
"""
361
Parquet file writing options.
362
363
Attributes:
364
- write_batch_size: Batch size for writing
365
- dictionary_pagesize_limit: Dictionary page size limit
366
- data_page_size: Data page size
367
- compression: Compression codec
368
- compression_level: Compression level
369
- use_dictionary: Dictionary encoding settings
370
- write_statistics: Statistics writing settings
371
"""
372
373
class WrittenFile:
374
"""
375
Information about written file.
376
377
Attributes:
378
- path: File path
379
- metadata: File metadata
380
- size: File size
381
"""
382
```
383
384
## Usage Examples
385
386
### Basic Dataset Operations
387
388
```python
389
import pyarrow as pa
390
import pyarrow.dataset as ds
391
import pyarrow.parquet as pq
392
import tempfile
393
import os
394
395
# Create sample data
396
table1 = pa.table({
397
'year': [2020, 2020, 2021, 2021],
398
'month': [1, 2, 1, 2],
399
'sales': [100, 150, 120, 180],
400
'region': ['North', 'South', 'North', 'South']
401
})
402
403
table2 = pa.table({
404
'year': [2022, 2022],
405
'month': [1, 2],
406
'sales': [200, 220],
407
'region': ['North', 'South']
408
})
409
410
with tempfile.TemporaryDirectory() as tmpdir:
411
# Write individual Parquet files
412
pq.write_table(table1, os.path.join(tmpdir, 'data1.parquet'))
413
pq.write_table(table2, os.path.join(tmpdir, 'data2.parquet'))
414
415
# Create dataset from directory
416
dataset = ds.dataset(tmpdir, format='parquet')
417
418
# Basic dataset info
419
print(f"Schema: {dataset.schema}")
420
print(f"Files: {len(list(dataset.get_fragments()))}")
421
422
# Read entire dataset
423
full_table = dataset.to_table()
424
print(f"Total rows: {len(full_table)}")
425
426
# Count rows without loading data
427
row_count = dataset.count_rows()
428
print(f"Row count: {row_count}")
429
430
# Get first few rows
431
head_table = dataset.head(3)
432
print(f"Head: {head_table}")
433
434
# Scan with projection
435
projected = dataset.to_table(columns=['year', 'sales'])
436
print(f"Projected columns: {projected.column_names}")
437
438
# Scan with filter
439
filtered = dataset.to_table(filter=ds.field('year') >= 2021)
440
print(f"Filtered rows: {len(filtered)}")
441
442
# Iterator over batches
443
total_batches = 0
444
for batch in dataset.to_batches(batch_size=2):
445
total_batches += 1
446
print(f"Batch {total_batches}: {batch.num_rows} rows")
447
```
448
449
### Partitioned Datasets
450
451
```python
452
import pyarrow as pa
453
import pyarrow.dataset as ds
454
import pyarrow.parquet as pq
455
import tempfile
456
import os
457
458
# Create larger sample data
459
data = {
460
'year': [2020] * 100 + [2021] * 100 + [2022] * 100,
461
'month': ([1] * 50 + [2] * 50) * 3,
462
'day': list(range(1, 51)) * 6,
463
'sales': [100 + i for i in range(300)],
464
'region': (['North', 'South'] * 150)
465
}
466
large_table = pa.table(data)
467
468
with tempfile.TemporaryDirectory() as tmpdir:
469
# Write partitioned dataset
470
ds.write_dataset(
471
large_table,
472
tmpdir,
473
format='parquet',
474
partitioning=['year', 'month'],
475
partitioning_flavor='hive'
476
)
477
478
# List created files
479
for root, dirs, files in os.walk(tmpdir):
480
for file in files:
481
rel_path = os.path.relpath(os.path.join(root, file), tmpdir)
482
print(f"Created: {rel_path}")
483
484
# Read partitioned dataset
485
partitioned_dataset = ds.dataset(tmpdir, format='parquet')
486
487
# Dataset automatically discovers partitioning
488
print(f"Partitioning: {partitioned_dataset.partitioning}")
489
print(f"Schema: {partitioned_dataset.schema}")
490
491
# Filter by partition
492
year_2021 = partitioned_dataset.to_table(
493
filter=ds.field('year') == 2021
494
)
495
print(f"Year 2021 rows: {len(year_2021)}")
496
497
# Multiple partition filters
498
specific_partition = partitioned_dataset.to_table(
499
filter=(ds.field('year') == 2021) & (ds.field('month') == 1)
500
)
501
print(f"2021-01 rows: {len(specific_partition)}")
502
503
# Get partition information
504
fragments = list(partitioned_dataset.get_fragments())
505
for i, fragment in enumerate(fragments[:3]): # First 3 fragments
506
print(f"Fragment {i}: {fragment.partition_expression}")
507
```
508
509
### Advanced Filtering and Projection
510
511
```python
512
import pyarrow as pa
513
import pyarrow.dataset as ds
514
import pyarrow.compute as pc
515
import tempfile
516
517
# Create dataset with complex data
518
table = pa.table({
519
'id': range(1000),
520
'category': ['A', 'B', 'C'] * 334, # Cycling categories
521
'value': [i * 1.5 for i in range(1000)],
522
'timestamp': pa.array([
523
f'2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}'
524
for i in range(1000)
525
], type=pa.string()),
526
'tags': [['tag1', 'tag2'][:i % 3] for i in range(1000)]
527
})
528
529
with tempfile.TemporaryDirectory() as tmpdir:
530
# Write dataset
531
ds.write_dataset(table, tmpdir, format='parquet')
532
dataset = ds.dataset(tmpdir, format='parquet')
533
534
# Complex filters
535
complex_filter = (
536
(ds.field('category').isin(['A', 'B'])) &
537
(ds.field('value') > 500) &
538
(ds.field('id') % 10 == 0)
539
)
540
541
filtered_data = dataset.to_table(filter=complex_filter)
542
print(f"Complex filter result: {len(filtered_data)} rows")
543
544
# String operations in filters
545
timestamp_filter = ds.field('timestamp').match_substring('2023-01')
546
jan_data = dataset.to_table(filter=timestamp_filter)
547
print(f"January data: {len(jan_data)} rows")
548
549
# Scanner with custom settings
550
scanner = dataset.scanner(
551
columns=['id', 'category', 'value'],
552
filter=ds.field('value') > 750,
553
batch_size=100
554
)
555
556
# Process in batches
557
batch_count = 0
558
total_rows = 0
559
for batch in scanner.to_batches():
560
batch_count += 1
561
total_rows += batch.num_rows
562
if batch_count <= 3: # Show first 3 batches
563
print(f"Batch {batch_count}: {batch.num_rows} rows, "
564
f"value range: {pc.min(batch['value'])} - {pc.max(batch['value'])}")
565
566
print(f"Total batches: {batch_count}, total rows: {total_rows}")
567
```
568
569
### Multi-Format Datasets
570
571
```python
572
import pyarrow as pa
573
import pyarrow.dataset as ds
574
import pyarrow.parquet as pq
575
import pyarrow.csv as csv
576
import pyarrow.feather as feather
577
import tempfile
578
import os
579
580
# Create sample data
581
base_data = {
582
'id': range(100),
583
'name': [f'item_{i}' for i in range(100)],
584
'price': [10.0 + i * 0.5 for i in range(100)]
585
}
586
587
table = pa.table(base_data)
588
589
with tempfile.TemporaryDirectory() as tmpdir:
590
# Write in different formats
591
pq.write_table(table.slice(0, 30), os.path.join(tmpdir, 'data1.parquet'))
592
csv.write_csv(table.slice(30, 30), os.path.join(tmpdir, 'data2.csv'))
593
feather.write_feather(table.slice(60, 40), os.path.join(tmpdir, 'data3.feather'))
594
595
# Create datasets for each format
596
parquet_ds = ds.dataset(
597
os.path.join(tmpdir, 'data1.parquet'),
598
format='parquet'
599
)
600
601
csv_ds = ds.dataset(
602
os.path.join(tmpdir, 'data2.csv'),
603
format='csv'
604
)
605
606
feather_ds = ds.dataset(
607
os.path.join(tmpdir, 'data3.feather'),
608
format='ipc' # Feather uses IPC format
609
)
610
611
# Union datasets
612
union_ds = ds.UnionDataset([parquet_ds, csv_ds, feather_ds])
613
614
# Read unified dataset
615
unified_table = union_ds.to_table()
616
print(f"Unified dataset rows: {len(unified_table)}")
617
print(f"Schema: {unified_table.schema}")
618
619
# Verify data integrity
620
assert len(unified_table) == 100
621
assert unified_table['id'].to_pylist() == list(range(100))
622
623
# Process by source
624
for i, fragment in enumerate(union_ds.get_fragments()):
625
fragment_table = fragment.to_table()
626
print(f"Fragment {i}: {len(fragment_table)} rows from {type(fragment).__name__}")
627
```
628
629
### Dataset Schema Evolution
630
631
```python
632
import pyarrow as pa
633
import pyarrow.dataset as ds
634
import pyarrow.parquet as pq
635
import tempfile
636
import os
637
638
with tempfile.TemporaryDirectory() as tmpdir:
639
# Version 1 schema
640
v1_table = pa.table({
641
'id': [1, 2, 3],
642
'name': ['Alice', 'Bob', 'Charlie'],
643
'value': [10.5, 20.3, 30.1]
644
})
645
646
# Version 2 schema (added column)
647
v2_table = pa.table({
648
'id': [4, 5, 6],
649
'name': ['Diana', 'Eve', 'Frank'],
650
'value': [40.7, 50.2, 60.8],
651
'category': ['A', 'B', 'A'] # New column
652
})
653
654
# Version 3 schema (changed type, added column)
655
v3_table = pa.table({
656
'id': [7, 8, 9],
657
'name': ['Grace', 'Henry', 'Iris'],
658
'value': [70.1, 80.9, 90.5],
659
'category': ['B', 'A', 'B'],
660
'timestamp': ['2023-01-01', '2023-01-02', '2023-01-03'] # Another new column
661
})
662
663
# Write different versions
664
pq.write_table(v1_table, os.path.join(tmpdir, 'v1.parquet'))
665
pq.write_table(v2_table, os.path.join(tmpdir, 'v2.parquet'))
666
pq.write_table(v3_table, os.path.join(tmpdir, 'v3.parquet'))
667
668
# Create dataset with schema evolution
669
dataset = ds.dataset(tmpdir, format='parquet')
670
671
# Dataset handles schema evolution automatically
672
print(f"Unified schema: {dataset.schema}")
673
674
# Read all data - missing columns filled with nulls
675
full_table = dataset.to_table()
676
print(f"Total rows: {len(full_table)}")
677
print(f"Columns: {full_table.column_names}")
678
679
# Show schema evolution effects
680
for col_name in full_table.column_names:
681
column = full_table[col_name]
682
null_count = pc.count(column, mode='only_null').as_py()
683
print(f"Column '{col_name}': {null_count} nulls out of {len(column)}")
684
685
# Handle schema evolution explicitly
686
# Define target schema
687
target_schema = pa.schema([
688
pa.field('id', pa.int64()),
689
pa.field('name', pa.string()),
690
pa.field('value', pa.float64()),
691
pa.field('category', pa.string()),
692
pa.field('timestamp', pa.string()),
693
pa.field('version', pa.int32()) # Add version tracking
694
])
695
696
# Project to target schema with computed column
697
projected = dataset.to_table(
698
schema=target_schema,
699
# Note: computed columns require more advanced techniques
700
# This example shows the concept
701
)
702
```
703
704
### Performance Optimization
705
706
```python
707
import pyarrow as pa
708
import pyarrow.dataset as ds
709
import pyarrow.parquet as pq
710
import tempfile
711
import time
712
713
# Create large dataset for performance testing
714
n_rows = 100000
715
large_table = pa.table({
716
'id': range(n_rows),
717
'category': ['A', 'B', 'C', 'D'] * (n_rows // 4),
718
'value1': [i * 1.1 for i in range(n_rows)],
719
'value2': [i * 2.2 for i in range(n_rows)],
720
'timestamp': pa.array([
721
f'2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}'
722
for i in range(n_rows)
723
])
724
})
725
726
with tempfile.TemporaryDirectory() as tmpdir:
727
# Write with different configurations
728
729
# Single file
730
start_time = time.time()
731
ds.write_dataset(
732
large_table,
733
os.path.join(tmpdir, 'single_file'),
734
format='parquet',
735
basename_template='data.parquet'
736
)
737
single_file_write_time = time.time() - start_time
738
739
# Partitioned by category
740
start_time = time.time()
741
ds.write_dataset(
742
large_table,
743
os.path.join(tmpdir, 'partitioned'),
744
format='parquet',
745
partitioning=['category']
746
)
747
partitioned_write_time = time.time() - start_time
748
749
# Multiple files with row limit
750
start_time = time.time()
751
ds.write_dataset(
752
large_table,
753
os.path.join(tmpdir, 'multi_file'),
754
format='parquet',
755
max_rows_per_file=20000
756
)
757
multi_file_write_time = time.time() - start_time
758
759
print(f"Write times:")
760
print(f" Single file: {single_file_write_time:.3f}s")
761
print(f" Partitioned: {partitioned_write_time:.3f}s")
762
print(f" Multi-file: {multi_file_write_time:.3f}s")
763
764
# Compare read performance
765
datasets = {
766
'single_file': ds.dataset(os.path.join(tmpdir, 'single_file')),
767
'partitioned': ds.dataset(os.path.join(tmpdir, 'partitioned')),
768
'multi_file': ds.dataset(os.path.join(tmpdir, 'multi_file'))
769
}
770
771
# Full table read
772
print(f"\nFull table read times:")
773
for name, dataset in datasets.items():
774
start_time = time.time()
775
table = dataset.to_table()
776
read_time = time.time() - start_time
777
print(f" {name}: {read_time:.3f}s ({len(table)} rows)")
778
779
# Filtered read (category = 'A')
780
print(f"\nFiltered read times (category='A'):")
781
for name, dataset in datasets.items():
782
start_time = time.time()
783
filtered = dataset.to_table(filter=ds.field('category') == 'A')
784
read_time = time.time() - start_time
785
print(f" {name}: {read_time:.3f}s ({len(filtered)} rows)")
786
787
# Column projection
788
print(f"\nProjected read times (id, value1 only):")
789
for name, dataset in datasets.items():
790
start_time = time.time()
791
projected = dataset.to_table(columns=['id', 'value1'])
792
read_time = time.time() - start_time
793
print(f" {name}: {read_time:.3f}s")
794
```