0
# Dataset Management
1
2
Advanced features for working with partitioned datasets, including reading from and writing to multi-file parquet collections with directory-based partitioning and dataset-level operations.
3
4
## Capabilities
5
6
### Multi-File Dataset Operations
7
8
#### Dataset Merging
9
10
Combine multiple parquet files into a logical dataset.
11
12
```python { .api }
13
def merge(file_list, verify_schema=True, open_with=None, root=False):
14
"""
15
Create logical dataset from multiple parquet files.
16
17
The files must either be in the same directory or at the same level
18
within a structured directory where directories provide partitioning
19
information. Schemas should be consistent across files.
20
21
Parameters:
22
- file_list: list, paths to parquet files or ParquetFile instances
23
- verify_schema: bool, verify schema consistency across all files
24
- open_with: function, file opening function with (path, mode) signature
25
- root: str, dataset root directory for partitioning disambiguation
26
27
Returns:
28
ParquetFile: Merged dataset representation with combined metadata
29
"""
30
```
31
32
#### Metadata Consolidation
33
34
Create unified metadata from multiple parquet files.
35
36
```python { .api }
37
def metadata_from_many(file_list, verify_schema=False, open_with=None,
38
root=False, fs=None):
39
"""
40
Create FileMetaData that points to multiple parquet files.
41
42
Parameters:
43
- file_list: list, paths to parquet files to consolidate
44
- verify_schema: bool, assert that schemas in each file are identical
45
- open_with: function, file opening function
46
- root: str, top directory of dataset tree
47
- fs: fsspec.AbstractFileSystem, filesystem interface
48
49
Returns:
50
tuple: (basepath, FileMetaData) with consolidated metadata
51
"""
52
```
53
54
### Partitioned Dataset Management
55
56
#### Path Analysis
57
58
Analyze file paths to determine partitioning schemes and extract partition information.
59
60
```python { .api }
61
def analyse_paths(file_list, root=False):
62
"""
63
Consolidate list of file paths into parquet relative paths.
64
65
Parameters:
66
- file_list: list, file paths to analyze
67
- root: str or False, base directory path
68
69
Returns:
70
tuple: (basepath, relative_paths) with common base and relative paths
71
"""
72
73
def get_file_scheme(paths):
74
"""
75
Determine partitioning scheme from file paths.
76
77
Parameters:
78
- paths: list, file paths to analyze (from row_group.columns[0].file_path)
79
80
Returns:
81
str: Partitioning scheme ('empty', 'simple', 'flat', 'hive', 'drill', 'other')
82
"""
83
```
84
85
#### Partition Functions
86
87
Utility functions for working with partitioned datasets.
88
89
```python { .api }
90
def partitions(row_group, only_values=False):
91
"""
92
Extract partition values from row group file path.
93
94
Parameters:
95
- row_group: RowGroup object or str, row group or file path
96
- only_values: bool, return only values (True) or full path (False)
97
98
Returns:
99
str: Partition values separated by '/' or full partition path
100
"""
101
102
def part_ids(row_groups):
103
"""
104
Extract part file IDs from row group file paths.
105
106
Finds integers matching "**part.*.parquet" pattern in paths.
107
108
Parameters:
109
- row_groups: list, row group objects to analyze
110
111
Returns:
112
dict: Mapping from part ID to (row_group_id, part_name) tuple
113
"""
114
115
def groupby_types(cats):
116
"""
117
Group partitioning categories by their data types.
118
119
Parameters:
120
- cats: dict, partition categories mapping column names to values
121
122
Returns:
123
dict: Categories grouped by inferred type
124
"""
125
126
def check_column_names(columns, path):
127
"""
128
Validate column names for parquet compatibility.
129
130
Parameters:
131
- columns: list, column names to validate
132
- path: str, file path for error reporting
133
134
Returns:
135
list: Validated column names
136
137
Raises:
138
ValueError: If column names are invalid
139
"""
140
141
def ex_from_sep(sep):
142
"""
143
Create regular expression from path separator.
144
145
Parameters:
146
- sep: str, path separator character
147
148
Returns:
149
re.Pattern: Regular expression for path matching
150
"""
151
```
152
153
### Dataset Row Group Management
154
155
#### Row Group Addition
156
157
Add new row groups to existing datasets.
158
159
```python { .api }
160
# ParquetFile method
161
def write_row_groups(self, data, row_group_offsets=None, sort_key=None,
162
sort_pnames=False, compression=None, write_fmd=True,
163
open_with=None, mkdirs=None, stats="auto"):
164
"""
165
Write data as new row groups to existing dataset.
166
167
Parameters:
168
- data: pandas.DataFrame or iterable, data to add to dataset
169
- row_group_offsets: int or list, row group size specification
170
- sort_key: function, sorting function for row group ordering
171
- sort_pnames: bool, align part file names with row group positions
172
- compression: str or dict, compression settings
173
- write_fmd: bool, write updated common metadata to disk
174
- open_with: function, file opening function
175
- mkdirs: function, directory creation function
176
- stats: bool or list, statistics calculation control
177
"""
178
```
179
180
#### Row Group Removal
181
182
Remove row groups from datasets and update metadata.
183
184
```python { .api }
185
# ParquetFile method
186
def remove_row_groups(self, rgs, sort_pnames=False, write_fmd=True,
187
open_with=None, remove_with=None):
188
"""
189
Remove row groups from disk and update metadata.
190
191
Cannot be applied to simple file scheme datasets. Removes files
192
and updates common metadata accordingly.
193
194
Parameters:
195
- rgs: RowGroup or list, row group(s) to remove
196
- sort_pnames: bool, align part file names after removal
197
- write_fmd: bool, write updated common metadata
198
- open_with: function, file opening function
199
- remove_with: function, file removal function
200
"""
201
```
202
203
### Dataset Overwriting and Updates
204
205
#### Partition Overwriting
206
207
Replace existing partitions with new data while preserving other partitions.
208
209
```python { .api }
210
def overwrite(dirpath, data, row_group_offsets=None, sort_pnames=True,
211
compression=None, open_with=None, mkdirs=None,
212
remove_with=None, stats=True):
213
"""
214
Overwrite partitions in existing hive-formatted parquet dataset.
215
216
Row groups with partition values overlapping with new data are
217
removed before new data is added. Only supports overwrite_partitioned
218
mode currently.
219
220
Parameters:
221
- dirpath: str, directory path to parquet dataset
222
- data: pandas.DataFrame, new data to write
223
- row_group_offsets: int or list, row group size specification
224
- sort_pnames: bool, align part file names with positions
225
- compression: str or dict, compression settings
226
- open_with: function, file opening function
227
- mkdirs: function, directory creation function
228
- remove_with: function, file removal function
229
- stats: bool or list, statistics calculation control
230
"""
231
```
232
233
### Common Metadata Management
234
235
#### Metadata File Writing
236
237
Write and manage common metadata files for multi-file datasets.
238
239
```python { .api }
240
def write_common_metadata(fn, fmd, open_with=None, no_row_groups=True):
241
"""
242
Write parquet schema information to shared metadata file.
243
244
For hive-style parquet datasets, creates _metadata and _common_metadata
245
files containing schema and file organization information.
246
247
Parameters:
248
- fn: str, metadata file path to write
249
- fmd: FileMetaData, metadata information to write
250
- open_with: function, file opening function
251
- no_row_groups: bool, exclude row group info (for _common_metadata)
252
"""
253
254
def consolidate_categories(fmd):
255
"""
256
Consolidate categorical metadata across row groups.
257
258
Updates pandas metadata to reflect the maximum number of categories
259
found across all row groups for each categorical column.
260
261
Parameters:
262
- fmd: FileMetaData, metadata to consolidate
263
"""
264
```
265
266
#### Metadata Updates
267
268
Update metadata in existing files without rewriting data.
269
270
```python { .api }
271
def update_custom_metadata(obj, custom_metadata):
272
"""
273
Update custom metadata in thrift object or parquet file.
274
275
Supports adding, updating, and removing (with None values) metadata
276
entries without affecting other metadata or data contents.
277
278
Parameters:
279
- obj: ThriftObject or ParquetFile, target for metadata update
280
- custom_metadata: dict, metadata updates to apply
281
"""
282
```
283
284
### File System Integration
285
286
#### File System Utilities
287
288
Functions for working with different file systems and storage backends.
289
290
```python { .api }
291
def get_fs(fn, open_with, mkdirs):
292
"""
293
Get filesystem object and normalize parameters from file path.
294
295
Detects and configures appropriate filesystem interface for given
296
path, supporting local files, cloud storage, and custom backends.
297
298
Parameters:
299
- fn: str, file path or URL
300
- open_with: function, file opening function
301
- mkdirs: function, directory creation function
302
303
Returns:
304
tuple: (filesystem, normalized_path, open_function, mkdir_function)
305
"""
306
307
def join_path(*path):
308
"""
309
Join path components with forward slashes.
310
311
Parameters:
312
- *path: str, path components to join
313
314
Returns:
315
str: Joined path with forward slash separators
316
"""
317
```
318
319
## Usage Examples
320
321
### Working with Multi-File Datasets
322
323
```python
324
from fastparquet import merge, ParquetFile
325
326
# Merge multiple parquet files into logical dataset
327
file_list = [
328
'part_000.parquet',
329
'part_001.parquet',
330
'part_002.parquet'
331
]
332
333
# Create merged dataset
334
merged_pf = merge(file_list, verify_schema=True)
335
336
# Read from merged dataset
337
df = merged_pf.to_pandas()
338
print(f"Total rows: {merged_pf.count()}")
339
print(f"File scheme: {merged_pf.file_scheme}")
340
```
341
342
### Managing Partitioned Datasets
343
344
```python
345
# Work with hive-style partitioned dataset
346
partitioned_pf = ParquetFile('/path/to/partitioned/dataset/')
347
348
# Check partitioning information
349
print(f"Partitions: {list(partitioned_pf.cats.keys())}")
350
print(f"Partition values: {partitioned_pf.cats}")
351
352
# Add new data to partitioned dataset
353
new_data = pd.DataFrame({
354
'id': range(100, 200),
355
'value': range(200, 300),
356
'year': [2024] * 100,
357
'month': [1] * 50 + [2] * 50
358
})
359
360
partitioned_pf.write_row_groups(
361
new_data,
362
compression='SNAPPY',
363
stats=True
364
)
365
```
366
367
### Row Group Management
368
369
```python
370
# Remove specific row groups
371
pf = ParquetFile('dataset/')
372
373
# Get row groups to remove (e.g., old data)
374
old_rgs = [rg for rg in pf.row_groups if some_condition(rg)]
375
376
# Remove old row groups
377
pf.remove_row_groups(
378
old_rgs,
379
sort_pnames=True, # Realign part file names
380
write_fmd=True # Update metadata
381
)
382
383
# Add new row groups
384
pf.write_row_groups(new_data, compression='GZIP')
385
```
386
387
### Dataset Overwriting
388
389
```python
390
from fastparquet import overwrite
391
392
# Overwrite specific partitions with new data
393
new_partition_data = pd.DataFrame({
394
'id': range(1000),
395
'value': range(1000),
396
'year': [2024] * 1000,
397
'month': [3] * 1000 # This will overwrite month=3 partition
398
})
399
400
overwrite(
401
'partitioned_dataset/',
402
new_partition_data,
403
compression='SNAPPY',
404
sort_pnames=True
405
)
406
```
407
408
### Custom Metadata Management
409
410
```python
411
from fastparquet.util import update_custom_metadata
412
from fastparquet.writer import update_file_custom_metadata
413
414
# Update metadata in ParquetFile object
415
pf = ParquetFile('data.parquet')
416
update_custom_metadata(pf, {
417
'processing_version': '2.0',
418
'last_updated': '2024-01-15',
419
'deprecated_field': None # Remove this field
420
})
421
422
# Update metadata directly in file
423
update_file_custom_metadata('data.parquet', {
424
'created_by': 'updated_application',
425
'schema_version': '1.2'
426
})
427
```
428
429
### Working with File Schemes
430
431
```python
432
from fastparquet.util import get_file_scheme, analyse_paths
433
434
# Analyze file organization
435
file_paths = [
436
'year=2023/month=1/part_000.parquet',
437
'year=2023/month=2/part_001.parquet',
438
'year=2024/month=1/part_002.parquet'
439
]
440
441
scheme = get_file_scheme(file_paths)
442
print(f"Detected scheme: {scheme}") # Output: 'hive'
443
444
# Analyze path structure
445
basepath, relative_paths = analyse_paths(file_paths)
446
print(f"Base path: {basepath}")
447
print(f"Relative paths: {relative_paths}")
448
```
449
450
## Type Definitions
451
452
```python { .api }
453
# File scheme types
454
FileScheme = Literal['empty', 'simple', 'flat', 'hive', 'drill', 'other']
455
456
# Row group specification
457
RowGroupSpec = Union[int, List[int]]
458
459
# File system interface
460
FileSystemInterface = Any # fsspec.AbstractFileSystem compatible
461
462
# File opening function
463
OpenFunction = Callable[[str, str], Any] # (path, mode) -> file-like
464
465
# Directory creation function
466
MkdirsFunction = Callable[[str], None] # (path) -> None
467
468
# File removal function
469
RemoveFunction = Callable[[List[str]], None] # (paths) -> None
470
471
# Sort key function for row groups
472
SortKeyFunction = Callable[[Any], Union[int, str]] # (row_group) -> sort_key
473
474
# Partition information
475
PartitionInfo = Dict[str, List[Any]] # column_name -> list of values
476
477
# Path analysis result
478
PathAnalysis = Tuple[str, List[str]] # (basepath, relative_paths)
479
```