MLflow is an open source platform for the complete machine learning lifecycle
—
MLflow's data management capabilities provide comprehensive dataset tracking, lineage, and versioning for machine learning workflows. The system supports various data sources, formats, and provides robust dataset lifecycle management with automated schema inference and validation.
Core functions for creating and loading datasets from various sources with automatic schema inference and metadata capture.
def from_pandas(df, source=None, targets=None, name=None, digest=None, path=None):
"""
Create Dataset from pandas DataFrame.
Parameters:
- df: pandas.DataFrame - Source DataFrame
- source: DatasetSource, optional - Dataset source information
- targets: str or list, optional - Target column names
- name: str, optional - Dataset name
- digest: str, optional - Dataset content digest
- path: str, optional - Path for dataset storage
Returns:
Dataset object wrapping the DataFrame
"""
def from_numpy(features, source=None, targets=None, name=None, digest=None, path=None):
"""
Create Dataset from numpy arrays.
Parameters:
- features: numpy.ndarray - Feature array
- source: DatasetSource, optional - Dataset source information
- targets: numpy.ndarray or str, optional - Target array or column name
- name: str, optional - Dataset name
- digest: str, optional - Dataset content digest
- path: str, optional - Path for dataset storage
Returns:
Dataset object wrapping the arrays
"""
def from_spark(df, source=None, targets=None, name=None, digest=None, path=None):
"""
Create Dataset from Spark DataFrame.
Parameters:
- df: pyspark.sql.DataFrame - Source Spark DataFrame
- source: DatasetSource, optional - Dataset source information
- targets: str or list, optional - Target column names
- name: str, optional - Dataset name
- digest: str, optional - Dataset content digest
- path: str, optional - Path for dataset storage
Returns:
Dataset object wrapping the Spark DataFrame
"""
def from_delta(table_name, version=None, timestamp=None, source=None, targets=None, name=None, digest=None, path=None):
"""
Create Dataset from Delta table.
Parameters:
- table_name: str - Delta table name or path
- version: int, optional - Specific table version
- timestamp: str, optional - Time travel timestamp
- source: DatasetSource, optional - Dataset source information
- targets: str or list, optional - Target column names
- name: str, optional - Dataset name
- digest: str, optional - Dataset content digest
- path: str, optional - Path for dataset storage
Returns:
Dataset object referencing Delta table
"""
def from_huggingface(path, source=None, targets=None, name=None, digest=None):
"""
Create Dataset from Hugging Face dataset.
Parameters:
- path: str - Hugging Face dataset path or name
- source: DatasetSource, optional - Dataset source information
- targets: str or list, optional - Target column names
- name: str, optional - Dataset name
- digest: str, optional - Dataset content digest
Returns:
Dataset object wrapping Hugging Face dataset
"""
def load_delta(table_name, version=None, timestamp=None):
"""
Load Delta table as Dataset.
Parameters:
- table_name: str - Delta table name or path
- version: int, optional - Specific table version
- timestamp: str, optional - Time travel timestamp
Returns:
Dataset object with Delta table data
"""Functions for managing dataset sources, registering datasets, and maintaining dataset metadata.
def get_source(dataset):
"""
Get source information from dataset.
Parameters:
- dataset: Dataset, DatasetEntity, or DatasetInput - Dataset object
Returns:
DatasetSource object with source metadata
"""
def get_registered_sources():
"""
Get all registered dataset source types.
Returns:
List of registered DatasetSource classes
"""
def get_dataset_source_from_json(source_json, source_type):
"""
Reconstruct DatasetSource from JSON representation.
Parameters:
- source_json: str - JSON representation of source
- source_type: str - Type of dataset source
Returns:
DatasetSource object reconstructed from JSON
"""Functions for logging datasets to MLflow runs with automatic lineage tracking and metadata capture.
def log_input(dataset, context=None, tags=None):
"""
Log dataset as input to current MLflow run.
Parameters:
- dataset: Dataset - Dataset to log as input
- context: str, optional - Context for dataset usage (train, validation, test)
- tags: dict, optional - Additional tags for dataset input
"""Classes and functions for dataset schema management, validation, and type checking.
class Dataset:
def __init__(self, df=None, source=None, targets=None, name=None, digest=None, path=None):
"""
Core Dataset class for data management.
Parameters:
- df: DataFrame or array - Underlying data structure
- source: DatasetSource - Source information
- targets: str or list, optional - Target column names
- name: str, optional - Dataset name
- digest: str, optional - Content digest for versioning
- path: str, optional - Storage path
"""
def to_pandas(self):
"""
Convert dataset to pandas DataFrame.
Returns:
pandas.DataFrame - DataFrame representation
"""
def to_numpy(self):
"""
Convert dataset to numpy arrays.
Returns:
Tuple of (features, targets) as numpy arrays
"""
def to_dict(self):
"""
Convert dataset to dictionary representation.
Returns:
dict - Dictionary with dataset metadata and data
"""
class DatasetSource:
def __init__(self):
"""Base class for dataset sources."""
def load(self, dst_path=None):
"""
Load dataset from source.
Parameters:
- dst_path: str, optional - Destination path for loading
Returns:
Loaded dataset content
"""
def to_json(self):
"""
Serialize source to JSON.
Returns:
str - JSON representation of source
"""
@staticmethod
def from_json(source_json):
"""
Reconstruct source from JSON.
Parameters:
- source_json: str - JSON representation
Returns:
DatasetSource object
"""Advanced dataset operations for managing dataset relationships, transformations, and metadata.
class MetaDataset:
def __init__(self, datasets, name=None, digest=None, tags=None):
"""
Meta dataset combining multiple datasets.
Parameters:
- datasets: list - List of Dataset objects
- name: str, optional - Meta dataset name
- digest: str, optional - Combined digest
- tags: dict, optional - Meta dataset tags
"""
def add_dataset(self, dataset, context=None):
"""
Add dataset to meta dataset.
Parameters:
- dataset: Dataset - Dataset to add
- context: str, optional - Context for dataset usage
"""
def get_datasets(self, context=None):
"""
Get datasets by context.
Parameters:
- context: str, optional - Filter by context
Returns:
List of Dataset objects
"""import mlflow
import mlflow.data
import pandas as pd
import numpy as np
# Create sample data
data = {
'feature1': np.random.randn(1000),
'feature2': np.random.randn(1000),
'feature3': np.random.randn(1000),
'target': np.random.randint(0, 2, 1000)
}
df = pd.DataFrame(data)
# Create dataset from pandas DataFrame
dataset = mlflow.data.from_pandas(
df=df,
targets="target",
name="classification_dataset"
)
# Log dataset to MLflow run
with mlflow.start_run():
# Log as input dataset
mlflow.data.log_input(dataset, context="training")
# Train model and log metrics
mlflow.log_metric("dataset_size", len(df))
mlflow.log_metric("num_features", len(df.columns) - 1)
print(f"Dataset name: {dataset.name}")
print(f"Dataset source: {dataset.source}")
print(f"Dataset digest: {dataset.digest}")import mlflow.data
from pyspark.sql import SparkSession
# Pandas DataFrame
pandas_df = pd.read_csv("data/train.csv")
pandas_dataset = mlflow.data.from_pandas(
df=pandas_df,
targets="target",
name="pandas_training_data"
)
# Numpy arrays
X = np.random.randn(1000, 10)
y = np.random.randint(0, 2, 1000)
numpy_dataset = mlflow.data.from_numpy(
features=X,
targets=y,
name="numpy_training_data"
)
# Spark DataFrame
spark = SparkSession.builder.getOrCreate()
spark_df = spark.read.format("delta").load("/path/to/delta/table")
spark_dataset = mlflow.data.from_spark(
df=spark_df,
targets="target",
name="spark_training_data"
)
# Delta table with time travel
delta_dataset = mlflow.data.from_delta(
table_name="ml_datasets.training_data",
version=5, # Specific version
targets="target",
name="delta_training_data_v5"
)
# Hugging Face dataset
hf_dataset = mlflow.data.from_huggingface(
path="imdb",
targets="label",
name="imdb_sentiment_data"
)
# Log multiple datasets
with mlflow.start_run():
mlflow.data.log_input(pandas_dataset, context="training")
mlflow.data.log_input(numpy_dataset, context="validation")
mlflow.data.log_input(spark_dataset, context="testing")import mlflow
import mlflow.data
# Original dataset
original_df = pd.read_csv("raw_data.csv")
original_dataset = mlflow.data.from_pandas(
df=original_df,
name="raw_customer_data",
path="/data/raw/customers.csv"
)
# Preprocessed dataset
def preprocess_data(df):
# Data cleaning and feature engineering
df_clean = df.dropna()
df_clean['feature_engineered'] = df_clean['feature1'] * df_clean['feature2']
return df_clean
preprocessed_df = preprocess_data(original_df)
preprocessed_dataset = mlflow.data.from_pandas(
df=preprocessed_df,
name="preprocessed_customer_data",
path="/data/processed/customers.csv"
)
# Training split
train_df = preprocessed_df.sample(frac=0.8, random_state=42)
train_dataset = mlflow.data.from_pandas(
df=train_df,
targets="target",
name="training_split"
)
# Validation split
val_df = preprocessed_df.drop(train_df.index)
val_dataset = mlflow.data.from_pandas(
df=val_df,
targets="target",
name="validation_split"
)
# Log complete data lineage
with mlflow.start_run() as run:
# Log all datasets with context
mlflow.data.log_input(original_dataset, context="raw")
mlflow.data.log_input(preprocessed_dataset, context="processed")
mlflow.data.log_input(train_dataset, context="training")
mlflow.data.log_input(val_dataset, context="validation")
# Log preprocessing parameters
mlflow.log_param("preprocessing_steps", "dropna,feature_engineering")
mlflow.log_param("train_size", len(train_df))
mlflow.log_param("val_size", len(val_df))
# Dataset digests for reproducibility
print(f"Training data digest: {train_dataset.digest}")
print(f"Validation data digest: {val_dataset.digest}")import mlflow.data
from mlflow.data.dataset_source import DatasetSource
class S3DatasetSource(DatasetSource):
"""Custom dataset source for S3 data."""
def __init__(self, s3_path, credentials=None):
self.s3_path = s3_path
self.credentials = credentials
def load(self, dst_path=None):
"""Load dataset from S3."""
import boto3
import pandas as pd
s3 = boto3.client('s3')
# Implementation for loading from S3
return pd.read_csv(self.s3_path)
def to_json(self):
"""Serialize to JSON."""
return {
"s3_path": self.s3_path,
"source_type": "s3"
}
@staticmethod
def from_json(source_json):
"""Deserialize from JSON."""
return S3DatasetSource(source_json["s3_path"])
# Create dataset with custom source
s3_source = S3DatasetSource("s3://my-bucket/data/train.csv")
s3_dataset = mlflow.data.Dataset(
source=s3_source,
name="s3_training_data"
)
# Register custom source
mlflow.data.dataset_source_registry.register_source(S3DatasetSource)import mlflow.data
# Create individual datasets
train_dataset = mlflow.data.from_pandas(train_df, targets="target", name="train")
val_dataset = mlflow.data.from_pandas(val_df, targets="target", name="validation")
test_dataset = mlflow.data.from_pandas(test_df, targets="target", name="test")
# Create meta dataset
meta_dataset = mlflow.data.MetaDataset(
datasets=[train_dataset, val_dataset, test_dataset],
name="complete_ml_dataset",
tags={"project": "customer_classification", "version": "v2"}
)
# Add context to datasets
meta_dataset.add_dataset(train_dataset, context="training")
meta_dataset.add_dataset(val_dataset, context="validation")
meta_dataset.add_dataset(test_dataset, context="testing")
# Log meta dataset
with mlflow.start_run():
mlflow.data.log_input(meta_dataset, context="complete_pipeline")
# Access datasets by context
training_datasets = meta_dataset.get_datasets(context="training")
validation_datasets = meta_dataset.get_datasets(context="validation")
print(f"Training datasets: {len(training_datasets)}")
print(f"Validation datasets: {len(validation_datasets)}")import mlflow.data
from mlflow.models import infer_signature
# Create dataset with schema validation
df = pd.DataFrame({
'feature1': [1.0, 2.0, 3.0],
'feature2': ['A', 'B', 'C'],
'target': [0, 1, 0]
})
dataset = mlflow.data.from_pandas(
df=df,
targets="target",
name="schema_validated_data"
)
# Infer and validate schema
expected_schema = infer_signature(df.drop('target', axis=1), df['target'])
# Validate new data against schema
new_df = pd.DataFrame({
'feature1': [4.0, 5.0],
'feature2': ['D', 'E'],
'target': [1, 0]
})
try:
from mlflow.models.utils import validate_schema
validate_schema(new_df.drop('target', axis=1), expected_schema.inputs)
print("Schema validation passed")
except Exception as e:
print(f"Schema validation failed: {e}")
# Log with schema validation
with mlflow.start_run():
mlflow.data.log_input(dataset, context="training")
mlflow.log_param("schema_validation", "enabled")from mlflow.data.dataset import Dataset
from mlflow.data.dataset_source import DatasetSource
from mlflow.entities import Dataset as DatasetEntity, DatasetInput
from mlflow.data.meta_dataset import MetaDataset
class Dataset:
df: Any
source: DatasetSource
targets: Optional[str]
name: Optional[str]
digest: Optional[str]
path: Optional[str]
class DatasetSource:
def load(self, dst_path: Optional[str] = None) -> Any: ...
def to_json(self) -> str: ...
@staticmethod
def from_json(source_json: str) -> 'DatasetSource': ...
class DatasetEntity:
name: str
digest: str
source_type: str
source: str
schema: Optional[str]
profile: Optional[str]
class DatasetInput:
dataset: DatasetEntity
tags: List[InputTag]
class MetaDataset:
datasets: List[Dataset]
name: Optional[str]
digest: Optional[str]
tags: Optional[Dict[str, str]]
# Built-in dataset source types
class PandasDatasetSource(DatasetSource):
path: Optional[str]
class NumpyDatasetSource(DatasetSource):
path: Optional[str]
class SparkDatasetSource(DatasetSource):
path: Optional[str]
class DeltaDatasetSource(DatasetSource):
table_name: str
version: Optional[int]
timestamp: Optional[str]
class HuggingFaceDatasetSource(DatasetSource):
path: str
revision: Optional[str]Install with Tessl CLI
npx tessl i tessl/pypi-mlflow