0
# Data Management
1
2
MLflow's data management capabilities provide comprehensive dataset tracking, lineage, and versioning for machine learning workflows. The system supports various data sources, formats, and provides robust dataset lifecycle management with automated schema inference and validation.
3
4
## Capabilities
5
6
### Dataset Creation and Loading
7
8
Core functions for creating and loading datasets from various sources with automatic schema inference and metadata capture.
9
10
```python { .api }
11
def from_pandas(df, source=None, targets=None, name=None, digest=None, path=None):
12
"""
13
Create Dataset from pandas DataFrame.
14
15
Parameters:
16
- df: pandas.DataFrame - Source DataFrame
17
- source: DatasetSource, optional - Dataset source information
18
- targets: str or list, optional - Target column names
19
- name: str, optional - Dataset name
20
- digest: str, optional - Dataset content digest
21
- path: str, optional - Path for dataset storage
22
23
Returns:
24
Dataset object wrapping the DataFrame
25
"""
26
27
def from_numpy(features, source=None, targets=None, name=None, digest=None, path=None):
28
"""
29
Create Dataset from numpy arrays.
30
31
Parameters:
32
- features: numpy.ndarray - Feature array
33
- source: DatasetSource, optional - Dataset source information
34
- targets: numpy.ndarray or str, optional - Target array or column name
35
- name: str, optional - Dataset name
36
- digest: str, optional - Dataset content digest
37
- path: str, optional - Path for dataset storage
38
39
Returns:
40
Dataset object wrapping the arrays
41
"""
42
43
def from_spark(df, source=None, targets=None, name=None, digest=None, path=None):
44
"""
45
Create Dataset from Spark DataFrame.
46
47
Parameters:
48
- df: pyspark.sql.DataFrame - Source Spark DataFrame
49
- source: DatasetSource, optional - Dataset source information
50
- targets: str or list, optional - Target column names
51
- name: str, optional - Dataset name
52
- digest: str, optional - Dataset content digest
53
- path: str, optional - Path for dataset storage
54
55
Returns:
56
Dataset object wrapping the Spark DataFrame
57
"""
58
59
def from_delta(table_name, version=None, timestamp=None, source=None, targets=None, name=None, digest=None, path=None):
60
"""
61
Create Dataset from Delta table.
62
63
Parameters:
64
- table_name: str - Delta table name or path
65
- version: int, optional - Specific table version
66
- timestamp: str, optional - Time travel timestamp
67
- source: DatasetSource, optional - Dataset source information
68
- targets: str or list, optional - Target column names
69
- name: str, optional - Dataset name
70
- digest: str, optional - Dataset content digest
71
- path: str, optional - Path for dataset storage
72
73
Returns:
74
Dataset object referencing Delta table
75
"""
76
77
def from_huggingface(path, source=None, targets=None, name=None, digest=None):
78
"""
79
Create Dataset from Hugging Face dataset.
80
81
Parameters:
82
- path: str - Hugging Face dataset path or name
83
- source: DatasetSource, optional - Dataset source information
84
- targets: str or list, optional - Target column names
85
- name: str, optional - Dataset name
86
- digest: str, optional - Dataset content digest
87
88
Returns:
89
Dataset object wrapping Hugging Face dataset
90
"""
91
92
def load_delta(table_name, version=None, timestamp=None):
93
"""
94
Load Delta table as Dataset.
95
96
Parameters:
97
- table_name: str - Delta table name or path
98
- version: int, optional - Specific table version
99
- timestamp: str, optional - Time travel timestamp
100
101
Returns:
102
Dataset object with Delta table data
103
"""
104
```
105
106
### Dataset Sources and Registry
107
108
Functions for managing dataset sources, registering datasets, and maintaining dataset metadata.
109
110
```python { .api }
111
def get_source(dataset):
112
"""
113
Get source information from dataset.
114
115
Parameters:
116
- dataset: Dataset, DatasetEntity, or DatasetInput - Dataset object
117
118
Returns:
119
DatasetSource object with source metadata
120
"""
121
122
def get_registered_sources():
123
"""
124
Get all registered dataset source types.
125
126
Returns:
127
List of registered DatasetSource classes
128
"""
129
130
def get_dataset_source_from_json(source_json, source_type):
131
"""
132
Reconstruct DatasetSource from JSON representation.
133
134
Parameters:
135
- source_json: str - JSON representation of source
136
- source_type: str - Type of dataset source
137
138
Returns:
139
DatasetSource object reconstructed from JSON
140
"""
141
```
142
143
### Dataset Logging and Tracking
144
145
Functions for logging datasets to MLflow runs with automatic lineage tracking and metadata capture.
146
147
```python { .api }
148
def log_input(dataset, context=None, tags=None):
149
"""
150
Log dataset as input to current MLflow run.
151
152
Parameters:
153
- dataset: Dataset - Dataset to log as input
154
- context: str, optional - Context for dataset usage (train, validation, test)
155
- tags: dict, optional - Additional tags for dataset input
156
"""
157
```
158
159
### Dataset Validation and Schema
160
161
Classes and functions for dataset schema management, validation, and type checking.
162
163
```python { .api }
164
class Dataset:
165
def __init__(self, df=None, source=None, targets=None, name=None, digest=None, path=None):
166
"""
167
Core Dataset class for data management.
168
169
Parameters:
170
- df: DataFrame or array - Underlying data structure
171
- source: DatasetSource - Source information
172
- targets: str or list, optional - Target column names
173
- name: str, optional - Dataset name
174
- digest: str, optional - Content digest for versioning
175
- path: str, optional - Storage path
176
"""
177
178
def to_pandas(self):
179
"""
180
Convert dataset to pandas DataFrame.
181
182
Returns:
183
pandas.DataFrame - DataFrame representation
184
"""
185
186
def to_numpy(self):
187
"""
188
Convert dataset to numpy arrays.
189
190
Returns:
191
Tuple of (features, targets) as numpy arrays
192
"""
193
194
def to_dict(self):
195
"""
196
Convert dataset to dictionary representation.
197
198
Returns:
199
dict - Dictionary with dataset metadata and data
200
"""
201
202
class DatasetSource:
203
def __init__(self):
204
"""Base class for dataset sources."""
205
206
def load(self, dst_path=None):
207
"""
208
Load dataset from source.
209
210
Parameters:
211
- dst_path: str, optional - Destination path for loading
212
213
Returns:
214
Loaded dataset content
215
"""
216
217
def to_json(self):
218
"""
219
Serialize source to JSON.
220
221
Returns:
222
str - JSON representation of source
223
"""
224
225
@staticmethod
226
def from_json(source_json):
227
"""
228
Reconstruct source from JSON.
229
230
Parameters:
231
- source_json: str - JSON representation
232
233
Returns:
234
DatasetSource object
235
"""
236
```
237
238
### Meta Dataset Operations
239
240
Advanced dataset operations for managing dataset relationships, transformations, and metadata.
241
242
```python { .api }
243
class MetaDataset:
244
def __init__(self, datasets, name=None, digest=None, tags=None):
245
"""
246
Meta dataset combining multiple datasets.
247
248
Parameters:
249
- datasets: list - List of Dataset objects
250
- name: str, optional - Meta dataset name
251
- digest: str, optional - Combined digest
252
- tags: dict, optional - Meta dataset tags
253
"""
254
255
def add_dataset(self, dataset, context=None):
256
"""
257
Add dataset to meta dataset.
258
259
Parameters:
260
- dataset: Dataset - Dataset to add
261
- context: str, optional - Context for dataset usage
262
"""
263
264
def get_datasets(self, context=None):
265
"""
266
Get datasets by context.
267
268
Parameters:
269
- context: str, optional - Filter by context
270
271
Returns:
272
List of Dataset objects
273
"""
274
```
275
276
## Usage Examples
277
278
### Basic Dataset Creation and Logging
279
280
```python
281
import mlflow
282
import mlflow.data
283
import pandas as pd
284
import numpy as np
285
286
# Create sample data
287
data = {
288
'feature1': np.random.randn(1000),
289
'feature2': np.random.randn(1000),
290
'feature3': np.random.randn(1000),
291
'target': np.random.randint(0, 2, 1000)
292
}
293
df = pd.DataFrame(data)
294
295
# Create dataset from pandas DataFrame
296
dataset = mlflow.data.from_pandas(
297
df=df,
298
targets="target",
299
name="classification_dataset"
300
)
301
302
# Log dataset to MLflow run
303
with mlflow.start_run():
304
# Log as input dataset
305
mlflow.data.log_input(dataset, context="training")
306
307
# Train model and log metrics
308
mlflow.log_metric("dataset_size", len(df))
309
mlflow.log_metric("num_features", len(df.columns) - 1)
310
311
print(f"Dataset name: {dataset.name}")
312
print(f"Dataset source: {dataset.source}")
313
print(f"Dataset digest: {dataset.digest}")
314
```
315
316
### Working with Different Data Sources
317
318
```python
319
import mlflow.data
320
from pyspark.sql import SparkSession
321
322
# Pandas DataFrame
323
pandas_df = pd.read_csv("data/train.csv")
324
pandas_dataset = mlflow.data.from_pandas(
325
df=pandas_df,
326
targets="target",
327
name="pandas_training_data"
328
)
329
330
# Numpy arrays
331
X = np.random.randn(1000, 10)
332
y = np.random.randint(0, 2, 1000)
333
numpy_dataset = mlflow.data.from_numpy(
334
features=X,
335
targets=y,
336
name="numpy_training_data"
337
)
338
339
# Spark DataFrame
340
spark = SparkSession.builder.getOrCreate()
341
spark_df = spark.read.format("delta").load("/path/to/delta/table")
342
spark_dataset = mlflow.data.from_spark(
343
df=spark_df,
344
targets="target",
345
name="spark_training_data"
346
)
347
348
# Delta table with time travel
349
delta_dataset = mlflow.data.from_delta(
350
table_name="ml_datasets.training_data",
351
version=5, # Specific version
352
targets="target",
353
name="delta_training_data_v5"
354
)
355
356
# Hugging Face dataset
357
hf_dataset = mlflow.data.from_huggingface(
358
path="imdb",
359
targets="label",
360
name="imdb_sentiment_data"
361
)
362
363
# Log multiple datasets
364
with mlflow.start_run():
365
mlflow.data.log_input(pandas_dataset, context="training")
366
mlflow.data.log_input(numpy_dataset, context="validation")
367
mlflow.data.log_input(spark_dataset, context="testing")
368
```
369
370
### Dataset Versioning and Lineage
371
372
```python
373
import mlflow
374
import mlflow.data
375
376
# Original dataset
377
original_df = pd.read_csv("raw_data.csv")
378
original_dataset = mlflow.data.from_pandas(
379
df=original_df,
380
name="raw_customer_data",
381
path="/data/raw/customers.csv"
382
)
383
384
# Preprocessed dataset
385
def preprocess_data(df):
386
# Data cleaning and feature engineering
387
df_clean = df.dropna()
388
df_clean['feature_engineered'] = df_clean['feature1'] * df_clean['feature2']
389
return df_clean
390
391
preprocessed_df = preprocess_data(original_df)
392
preprocessed_dataset = mlflow.data.from_pandas(
393
df=preprocessed_df,
394
name="preprocessed_customer_data",
395
path="/data/processed/customers.csv"
396
)
397
398
# Training split
399
train_df = preprocessed_df.sample(frac=0.8, random_state=42)
400
train_dataset = mlflow.data.from_pandas(
401
df=train_df,
402
targets="target",
403
name="training_split"
404
)
405
406
# Validation split
407
val_df = preprocessed_df.drop(train_df.index)
408
val_dataset = mlflow.data.from_pandas(
409
df=val_df,
410
targets="target",
411
name="validation_split"
412
)
413
414
# Log complete data lineage
415
with mlflow.start_run() as run:
416
# Log all datasets with context
417
mlflow.data.log_input(original_dataset, context="raw")
418
mlflow.data.log_input(preprocessed_dataset, context="processed")
419
mlflow.data.log_input(train_dataset, context="training")
420
mlflow.data.log_input(val_dataset, context="validation")
421
422
# Log preprocessing parameters
423
mlflow.log_param("preprocessing_steps", "dropna,feature_engineering")
424
mlflow.log_param("train_size", len(train_df))
425
mlflow.log_param("val_size", len(val_df))
426
427
# Dataset digests for reproducibility
428
print(f"Training data digest: {train_dataset.digest}")
429
print(f"Validation data digest: {val_dataset.digest}")
430
```
431
432
### Custom Dataset Sources
433
434
```python
435
import mlflow.data
436
from mlflow.data.dataset_source import DatasetSource
437
438
class S3DatasetSource(DatasetSource):
439
"""Custom dataset source for S3 data."""
440
441
def __init__(self, s3_path, credentials=None):
442
self.s3_path = s3_path
443
self.credentials = credentials
444
445
def load(self, dst_path=None):
446
"""Load dataset from S3."""
447
import boto3
448
import pandas as pd
449
450
s3 = boto3.client('s3')
451
# Implementation for loading from S3
452
return pd.read_csv(self.s3_path)
453
454
def to_json(self):
455
"""Serialize to JSON."""
456
return {
457
"s3_path": self.s3_path,
458
"source_type": "s3"
459
}
460
461
@staticmethod
462
def from_json(source_json):
463
"""Deserialize from JSON."""
464
return S3DatasetSource(source_json["s3_path"])
465
466
# Create dataset with custom source
467
s3_source = S3DatasetSource("s3://my-bucket/data/train.csv")
468
s3_dataset = mlflow.data.Dataset(
469
source=s3_source,
470
name="s3_training_data"
471
)
472
473
# Register custom source
474
mlflow.data.dataset_source_registry.register_source(S3DatasetSource)
475
```
476
477
### Meta Dataset Operations
478
479
```python
480
import mlflow.data
481
482
# Create individual datasets
483
train_dataset = mlflow.data.from_pandas(train_df, targets="target", name="train")
484
val_dataset = mlflow.data.from_pandas(val_df, targets="target", name="validation")
485
test_dataset = mlflow.data.from_pandas(test_df, targets="target", name="test")
486
487
# Create meta dataset
488
meta_dataset = mlflow.data.MetaDataset(
489
datasets=[train_dataset, val_dataset, test_dataset],
490
name="complete_ml_dataset",
491
tags={"project": "customer_classification", "version": "v2"}
492
)
493
494
# Add context to datasets
495
meta_dataset.add_dataset(train_dataset, context="training")
496
meta_dataset.add_dataset(val_dataset, context="validation")
497
meta_dataset.add_dataset(test_dataset, context="testing")
498
499
# Log meta dataset
500
with mlflow.start_run():
501
mlflow.data.log_input(meta_dataset, context="complete_pipeline")
502
503
# Access datasets by context
504
training_datasets = meta_dataset.get_datasets(context="training")
505
validation_datasets = meta_dataset.get_datasets(context="validation")
506
507
print(f"Training datasets: {len(training_datasets)}")
508
print(f"Validation datasets: {len(validation_datasets)}")
509
```
510
511
### Dataset Schema Validation
512
513
```python
514
import mlflow.data
515
from mlflow.models import infer_signature
516
517
# Create dataset with schema validation
518
df = pd.DataFrame({
519
'feature1': [1.0, 2.0, 3.0],
520
'feature2': ['A', 'B', 'C'],
521
'target': [0, 1, 0]
522
})
523
524
dataset = mlflow.data.from_pandas(
525
df=df,
526
targets="target",
527
name="schema_validated_data"
528
)
529
530
# Infer and validate schema
531
expected_schema = infer_signature(df.drop('target', axis=1), df['target'])
532
533
# Validate new data against schema
534
new_df = pd.DataFrame({
535
'feature1': [4.0, 5.0],
536
'feature2': ['D', 'E'],
537
'target': [1, 0]
538
})
539
540
try:
541
from mlflow.models.utils import validate_schema
542
validate_schema(new_df.drop('target', axis=1), expected_schema.inputs)
543
print("Schema validation passed")
544
except Exception as e:
545
print(f"Schema validation failed: {e}")
546
547
# Log with schema validation
548
with mlflow.start_run():
549
mlflow.data.log_input(dataset, context="training")
550
mlflow.log_param("schema_validation", "enabled")
551
```
552
553
## Types
554
555
```python { .api }
556
from mlflow.data.dataset import Dataset
557
from mlflow.data.dataset_source import DatasetSource
558
from mlflow.entities import Dataset as DatasetEntity, DatasetInput
559
from mlflow.data.meta_dataset import MetaDataset
560
561
class Dataset:
562
df: Any
563
source: DatasetSource
564
targets: Optional[str]
565
name: Optional[str]
566
digest: Optional[str]
567
path: Optional[str]
568
569
class DatasetSource:
570
def load(self, dst_path: Optional[str] = None) -> Any: ...
571
def to_json(self) -> str: ...
572
573
@staticmethod
574
def from_json(source_json: str) -> 'DatasetSource': ...
575
576
class DatasetEntity:
577
name: str
578
digest: str
579
source_type: str
580
source: str
581
schema: Optional[str]
582
profile: Optional[str]
583
584
class DatasetInput:
585
dataset: DatasetEntity
586
tags: List[InputTag]
587
588
class MetaDataset:
589
datasets: List[Dataset]
590
name: Optional[str]
591
digest: Optional[str]
592
tags: Optional[Dict[str, str]]
593
594
# Built-in dataset source types
595
class PandasDatasetSource(DatasetSource):
596
path: Optional[str]
597
598
class NumpyDatasetSource(DatasetSource):
599
path: Optional[str]
600
601
class SparkDatasetSource(DatasetSource):
602
path: Optional[str]
603
604
class DeltaDatasetSource(DatasetSource):
605
table_name: str
606
version: Optional[int]
607
timestamp: Optional[str]
608
609
class HuggingFaceDatasetSource(DatasetSource):
610
path: str
611
revision: Optional[str]
612
```