or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-management.mdindex.mdreading.mdschema-types.mdwriting.md

dataset-management.mddocs/

0

# Dataset Management

1

2

Advanced features for working with partitioned datasets, including reading from and writing to multi-file parquet collections with directory-based partitioning and dataset-level operations.

3

4

## Capabilities

5

6

### Multi-File Dataset Operations

7

8

#### Dataset Merging

9

10

Combine multiple parquet files into a logical dataset.

11

12

```python { .api }

13

def merge(file_list, verify_schema=True, open_with=None, root=False):

14

"""

15

Create logical dataset from multiple parquet files.

16

17

The files must either be in the same directory or at the same level

18

within a structured directory where directories provide partitioning

19

information. Schemas should be consistent across files.

20

21

Parameters:

22

- file_list: list, paths to parquet files or ParquetFile instances

23

- verify_schema: bool, verify schema consistency across all files

24

- open_with: function, file opening function with (path, mode) signature

25

- root: str, dataset root directory for partitioning disambiguation

26

27

Returns:

28

ParquetFile: Merged dataset representation with combined metadata

29

"""

30

```

31

32

#### Metadata Consolidation

33

34

Create unified metadata from multiple parquet files.

35

36

```python { .api }

37

def metadata_from_many(file_list, verify_schema=False, open_with=None,

38

root=False, fs=None):

39

"""

40

Create FileMetaData that points to multiple parquet files.

41

42

Parameters:

43

- file_list: list, paths to parquet files to consolidate

44

- verify_schema: bool, assert that schemas in each file are identical

45

- open_with: function, file opening function

46

- root: str, top directory of dataset tree

47

- fs: fsspec.AbstractFileSystem, filesystem interface

48

49

Returns:

50

tuple: (basepath, FileMetaData) with consolidated metadata

51

"""

52

```

53

54

### Partitioned Dataset Management

55

56

#### Path Analysis

57

58

Analyze file paths to determine partitioning schemes and extract partition information.

59

60

```python { .api }

61

def analyse_paths(file_list, root=False):

62

"""

63

Consolidate list of file paths into parquet relative paths.

64

65

Parameters:

66

- file_list: list, file paths to analyze

67

- root: str or False, base directory path

68

69

Returns:

70

tuple: (basepath, relative_paths) with common base and relative paths

71

"""

72

73

def get_file_scheme(paths):

74

"""

75

Determine partitioning scheme from file paths.

76

77

Parameters:

78

- paths: list, file paths to analyze (from row_group.columns[0].file_path)

79

80

Returns:

81

str: Partitioning scheme ('empty', 'simple', 'flat', 'hive', 'drill', 'other')

82

"""

83

```

84

85

#### Partition Functions

86

87

Utility functions for working with partitioned datasets.

88

89

```python { .api }

90

def partitions(row_group, only_values=False):

91

"""

92

Extract partition values from row group file path.

93

94

Parameters:

95

- row_group: RowGroup object or str, row group or file path

96

- only_values: bool, return only values (True) or full path (False)

97

98

Returns:

99

str: Partition values separated by '/' or full partition path

100

"""

101

102

def part_ids(row_groups):

103

"""

104

Extract part file IDs from row group file paths.

105

106

Finds integers matching "**part.*.parquet" pattern in paths.

107

108

Parameters:

109

- row_groups: list, row group objects to analyze

110

111

Returns:

112

dict: Mapping from part ID to (row_group_id, part_name) tuple

113

"""

114

115

def groupby_types(cats):

116

"""

117

Group partitioning categories by their data types.

118

119

Parameters:

120

- cats: dict, partition categories mapping column names to values

121

122

Returns:

123

dict: Categories grouped by inferred type

124

"""

125

126

def check_column_names(columns, path):

127

"""

128

Validate column names for parquet compatibility.

129

130

Parameters:

131

- columns: list, column names to validate

132

- path: str, file path for error reporting

133

134

Returns:

135

list: Validated column names

136

137

Raises:

138

ValueError: If column names are invalid

139

"""

140

141

def ex_from_sep(sep):

142

"""

143

Create regular expression from path separator.

144

145

Parameters:

146

- sep: str, path separator character

147

148

Returns:

149

re.Pattern: Regular expression for path matching

150

"""

151

```

152

153

### Dataset Row Group Management

154

155

#### Row Group Addition

156

157

Add new row groups to existing datasets.

158

159

```python { .api }

160

# ParquetFile method

161

def write_row_groups(self, data, row_group_offsets=None, sort_key=None,

162

sort_pnames=False, compression=None, write_fmd=True,

163

open_with=None, mkdirs=None, stats="auto"):

164

"""

165

Write data as new row groups to existing dataset.

166

167

Parameters:

168

- data: pandas.DataFrame or iterable, data to add to dataset

169

- row_group_offsets: int or list, row group size specification

170

- sort_key: function, sorting function for row group ordering

171

- sort_pnames: bool, align part file names with row group positions

172

- compression: str or dict, compression settings

173

- write_fmd: bool, write updated common metadata to disk

174

- open_with: function, file opening function

175

- mkdirs: function, directory creation function

176

- stats: bool or list, statistics calculation control

177

"""

178

```

179

180

#### Row Group Removal

181

182

Remove row groups from datasets and update metadata.

183

184

```python { .api }

185

# ParquetFile method

186

def remove_row_groups(self, rgs, sort_pnames=False, write_fmd=True,

187

open_with=None, remove_with=None):

188

"""

189

Remove row groups from disk and update metadata.

190

191

Cannot be applied to simple file scheme datasets. Removes files

192

and updates common metadata accordingly.

193

194

Parameters:

195

- rgs: RowGroup or list, row group(s) to remove

196

- sort_pnames: bool, align part file names after removal

197

- write_fmd: bool, write updated common metadata

198

- open_with: function, file opening function

199

- remove_with: function, file removal function

200

"""

201

```

202

203

### Dataset Overwriting and Updates

204

205

#### Partition Overwriting

206

207

Replace existing partitions with new data while preserving other partitions.

208

209

```python { .api }

210

def overwrite(dirpath, data, row_group_offsets=None, sort_pnames=True,

211

compression=None, open_with=None, mkdirs=None,

212

remove_with=None, stats=True):

213

"""

214

Overwrite partitions in existing hive-formatted parquet dataset.

215

216

Row groups with partition values overlapping with new data are

217

removed before new data is added. Only supports overwrite_partitioned

218

mode currently.

219

220

Parameters:

221

- dirpath: str, directory path to parquet dataset

222

- data: pandas.DataFrame, new data to write

223

- row_group_offsets: int or list, row group size specification

224

- sort_pnames: bool, align part file names with positions

225

- compression: str or dict, compression settings

226

- open_with: function, file opening function

227

- mkdirs: function, directory creation function

228

- remove_with: function, file removal function

229

- stats: bool or list, statistics calculation control

230

"""

231

```

232

233

### Common Metadata Management

234

235

#### Metadata File Writing

236

237

Write and manage common metadata files for multi-file datasets.

238

239

```python { .api }

240

def write_common_metadata(fn, fmd, open_with=None, no_row_groups=True):

241

"""

242

Write parquet schema information to shared metadata file.

243

244

For hive-style parquet datasets, creates _metadata and _common_metadata

245

files containing schema and file organization information.

246

247

Parameters:

248

- fn: str, metadata file path to write

249

- fmd: FileMetaData, metadata information to write

250

- open_with: function, file opening function

251

- no_row_groups: bool, exclude row group info (for _common_metadata)

252

"""

253

254

def consolidate_categories(fmd):

255

"""

256

Consolidate categorical metadata across row groups.

257

258

Updates pandas metadata to reflect the maximum number of categories

259

found across all row groups for each categorical column.

260

261

Parameters:

262

- fmd: FileMetaData, metadata to consolidate

263

"""

264

```

265

266

#### Metadata Updates

267

268

Update metadata in existing files without rewriting data.

269

270

```python { .api }

271

def update_custom_metadata(obj, custom_metadata):

272

"""

273

Update custom metadata in thrift object or parquet file.

274

275

Supports adding, updating, and removing (with None values) metadata

276

entries without affecting other metadata or data contents.

277

278

Parameters:

279

- obj: ThriftObject or ParquetFile, target for metadata update

280

- custom_metadata: dict, metadata updates to apply

281

"""

282

```

283

284

### File System Integration

285

286

#### File System Utilities

287

288

Functions for working with different file systems and storage backends.

289

290

```python { .api }

291

def get_fs(fn, open_with, mkdirs):

292

"""

293

Get filesystem object and normalize parameters from file path.

294

295

Detects and configures appropriate filesystem interface for given

296

path, supporting local files, cloud storage, and custom backends.

297

298

Parameters:

299

- fn: str, file path or URL

300

- open_with: function, file opening function

301

- mkdirs: function, directory creation function

302

303

Returns:

304

tuple: (filesystem, normalized_path, open_function, mkdir_function)

305

"""

306

307

def join_path(*path):

308

"""

309

Join path components with forward slashes.

310

311

Parameters:

312

- *path: str, path components to join

313

314

Returns:

315

str: Joined path with forward slash separators

316

"""

317

```

318

319

## Usage Examples

320

321

### Working with Multi-File Datasets

322

323

```python

324

from fastparquet import merge, ParquetFile

325

326

# Merge multiple parquet files into logical dataset

327

file_list = [

328

'part_000.parquet',

329

'part_001.parquet',

330

'part_002.parquet'

331

]

332

333

# Create merged dataset

334

merged_pf = merge(file_list, verify_schema=True)

335

336

# Read from merged dataset

337

df = merged_pf.to_pandas()

338

print(f"Total rows: {merged_pf.count()}")

339

print(f"File scheme: {merged_pf.file_scheme}")

340

```

341

342

### Managing Partitioned Datasets

343

344

```python

345

# Work with hive-style partitioned dataset

346

partitioned_pf = ParquetFile('/path/to/partitioned/dataset/')

347

348

# Check partitioning information

349

print(f"Partitions: {list(partitioned_pf.cats.keys())}")

350

print(f"Partition values: {partitioned_pf.cats}")

351

352

# Add new data to partitioned dataset

353

new_data = pd.DataFrame({

354

'id': range(100, 200),

355

'value': range(200, 300),

356

'year': [2024] * 100,

357

'month': [1] * 50 + [2] * 50

358

})

359

360

partitioned_pf.write_row_groups(

361

new_data,

362

compression='SNAPPY',

363

stats=True

364

)

365

```

366

367

### Row Group Management

368

369

```python

370

# Remove specific row groups

371

pf = ParquetFile('dataset/')

372

373

# Get row groups to remove (e.g., old data)

374

old_rgs = [rg for rg in pf.row_groups if some_condition(rg)]

375

376

# Remove old row groups

377

pf.remove_row_groups(

378

old_rgs,

379

sort_pnames=True, # Realign part file names

380

write_fmd=True # Update metadata

381

)

382

383

# Add new row groups

384

pf.write_row_groups(new_data, compression='GZIP')

385

```

386

387

### Dataset Overwriting

388

389

```python

390

from fastparquet import overwrite

391

392

# Overwrite specific partitions with new data

393

new_partition_data = pd.DataFrame({

394

'id': range(1000),

395

'value': range(1000),

396

'year': [2024] * 1000,

397

'month': [3] * 1000 # This will overwrite month=3 partition

398

})

399

400

overwrite(

401

'partitioned_dataset/',

402

new_partition_data,

403

compression='SNAPPY',

404

sort_pnames=True

405

)

406

```

407

408

### Custom Metadata Management

409

410

```python

411

from fastparquet.util import update_custom_metadata

412

from fastparquet.writer import update_file_custom_metadata

413

414

# Update metadata in ParquetFile object

415

pf = ParquetFile('data.parquet')

416

update_custom_metadata(pf, {

417

'processing_version': '2.0',

418

'last_updated': '2024-01-15',

419

'deprecated_field': None # Remove this field

420

})

421

422

# Update metadata directly in file

423

update_file_custom_metadata('data.parquet', {

424

'created_by': 'updated_application',

425

'schema_version': '1.2'

426

})

427

```

428

429

### Working with File Schemes

430

431

```python

432

from fastparquet.util import get_file_scheme, analyse_paths

433

434

# Analyze file organization

435

file_paths = [

436

'year=2023/month=1/part_000.parquet',

437

'year=2023/month=2/part_001.parquet',

438

'year=2024/month=1/part_002.parquet'

439

]

440

441

scheme = get_file_scheme(file_paths)

442

print(f"Detected scheme: {scheme}") # Output: 'hive'

443

444

# Analyze path structure

445

basepath, relative_paths = analyse_paths(file_paths)

446

print(f"Base path: {basepath}")

447

print(f"Relative paths: {relative_paths}")

448

```

449

450

## Type Definitions

451

452

```python { .api }

453

# File scheme types

454

FileScheme = Literal['empty', 'simple', 'flat', 'hive', 'drill', 'other']

455

456

# Row group specification

457

RowGroupSpec = Union[int, List[int]]

458

459

# File system interface

460

FileSystemInterface = Any # fsspec.AbstractFileSystem compatible

461

462

# File opening function

463

OpenFunction = Callable[[str, str], Any] # (path, mode) -> file-like

464

465

# Directory creation function

466

MkdirsFunction = Callable[[str], None] # (path) -> None

467

468

# File removal function

469

RemoveFunction = Callable[[List[str]], None] # (paths) -> None

470

471

# Sort key function for row groups

472

SortKeyFunction = Callable[[Any], Union[int, str]] # (row_group) -> sort_key

473

474

# Partition information

475

PartitionInfo = Dict[str, List[Any]] # column_name -> list of values

476

477

# Path analysis result

478

PathAnalysis = Tuple[str, List[str]] # (basepath, relative_paths)

479

```