or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdconfiguration.mddata.mdframeworks.mdgenai.mdindex.mdmodels.mdprojects.mdtracing.mdtracking.md

data.mddocs/

0

# Data Management

1

2

MLflow's data management capabilities provide comprehensive dataset tracking, lineage, and versioning for machine learning workflows. The system supports various data sources, formats, and provides robust dataset lifecycle management with automated schema inference and validation.

3

4

## Capabilities

5

6

### Dataset Creation and Loading

7

8

Core functions for creating and loading datasets from various sources with automatic schema inference and metadata capture.

9

10

```python { .api }

11

def from_pandas(df, source=None, targets=None, name=None, digest=None, path=None):

12

"""

13

Create Dataset from pandas DataFrame.

14

15

Parameters:

16

- df: pandas.DataFrame - Source DataFrame

17

- source: DatasetSource, optional - Dataset source information

18

- targets: str or list, optional - Target column names

19

- name: str, optional - Dataset name

20

- digest: str, optional - Dataset content digest

21

- path: str, optional - Path for dataset storage

22

23

Returns:

24

Dataset object wrapping the DataFrame

25

"""

26

27

def from_numpy(features, source=None, targets=None, name=None, digest=None, path=None):

28

"""

29

Create Dataset from numpy arrays.

30

31

Parameters:

32

- features: numpy.ndarray - Feature array

33

- source: DatasetSource, optional - Dataset source information

34

- targets: numpy.ndarray or str, optional - Target array or column name

35

- name: str, optional - Dataset name

36

- digest: str, optional - Dataset content digest

37

- path: str, optional - Path for dataset storage

38

39

Returns:

40

Dataset object wrapping the arrays

41

"""

42

43

def from_spark(df, source=None, targets=None, name=None, digest=None, path=None):

44

"""

45

Create Dataset from Spark DataFrame.

46

47

Parameters:

48

- df: pyspark.sql.DataFrame - Source Spark DataFrame

49

- source: DatasetSource, optional - Dataset source information

50

- targets: str or list, optional - Target column names

51

- name: str, optional - Dataset name

52

- digest: str, optional - Dataset content digest

53

- path: str, optional - Path for dataset storage

54

55

Returns:

56

Dataset object wrapping the Spark DataFrame

57

"""

58

59

def from_delta(table_name, version=None, timestamp=None, source=None, targets=None, name=None, digest=None, path=None):

60

"""

61

Create Dataset from Delta table.

62

63

Parameters:

64

- table_name: str - Delta table name or path

65

- version: int, optional - Specific table version

66

- timestamp: str, optional - Time travel timestamp

67

- source: DatasetSource, optional - Dataset source information

68

- targets: str or list, optional - Target column names

69

- name: str, optional - Dataset name

70

- digest: str, optional - Dataset content digest

71

- path: str, optional - Path for dataset storage

72

73

Returns:

74

Dataset object referencing Delta table

75

"""

76

77

def from_huggingface(path, source=None, targets=None, name=None, digest=None):

78

"""

79

Create Dataset from Hugging Face dataset.

80

81

Parameters:

82

- path: str - Hugging Face dataset path or name

83

- source: DatasetSource, optional - Dataset source information

84

- targets: str or list, optional - Target column names

85

- name: str, optional - Dataset name

86

- digest: str, optional - Dataset content digest

87

88

Returns:

89

Dataset object wrapping Hugging Face dataset

90

"""

91

92

def load_delta(table_name, version=None, timestamp=None):

93

"""

94

Load Delta table as Dataset.

95

96

Parameters:

97

- table_name: str - Delta table name or path

98

- version: int, optional - Specific table version

99

- timestamp: str, optional - Time travel timestamp

100

101

Returns:

102

Dataset object with Delta table data

103

"""

104

```

105

106

### Dataset Sources and Registry

107

108

Functions for managing dataset sources, registering datasets, and maintaining dataset metadata.

109

110

```python { .api }

111

def get_source(dataset):

112

"""

113

Get source information from dataset.

114

115

Parameters:

116

- dataset: Dataset, DatasetEntity, or DatasetInput - Dataset object

117

118

Returns:

119

DatasetSource object with source metadata

120

"""

121

122

def get_registered_sources():

123

"""

124

Get all registered dataset source types.

125

126

Returns:

127

List of registered DatasetSource classes

128

"""

129

130

def get_dataset_source_from_json(source_json, source_type):

131

"""

132

Reconstruct DatasetSource from JSON representation.

133

134

Parameters:

135

- source_json: str - JSON representation of source

136

- source_type: str - Type of dataset source

137

138

Returns:

139

DatasetSource object reconstructed from JSON

140

"""

141

```

142

143

### Dataset Logging and Tracking

144

145

Functions for logging datasets to MLflow runs with automatic lineage tracking and metadata capture.

146

147

```python { .api }

148

def log_input(dataset, context=None, tags=None):

149

"""

150

Log dataset as input to current MLflow run.

151

152

Parameters:

153

- dataset: Dataset - Dataset to log as input

154

- context: str, optional - Context for dataset usage (train, validation, test)

155

- tags: dict, optional - Additional tags for dataset input

156

"""

157

```

158

159

### Dataset Validation and Schema

160

161

Classes and functions for dataset schema management, validation, and type checking.

162

163

```python { .api }

164

class Dataset:

165

def __init__(self, df=None, source=None, targets=None, name=None, digest=None, path=None):

166

"""

167

Core Dataset class for data management.

168

169

Parameters:

170

- df: DataFrame or array - Underlying data structure

171

- source: DatasetSource - Source information

172

- targets: str or list, optional - Target column names

173

- name: str, optional - Dataset name

174

- digest: str, optional - Content digest for versioning

175

- path: str, optional - Storage path

176

"""

177

178

def to_pandas(self):

179

"""

180

Convert dataset to pandas DataFrame.

181

182

Returns:

183

pandas.DataFrame - DataFrame representation

184

"""

185

186

def to_numpy(self):

187

"""

188

Convert dataset to numpy arrays.

189

190

Returns:

191

Tuple of (features, targets) as numpy arrays

192

"""

193

194

def to_dict(self):

195

"""

196

Convert dataset to dictionary representation.

197

198

Returns:

199

dict - Dictionary with dataset metadata and data

200

"""

201

202

class DatasetSource:

203

def __init__(self):

204

"""Base class for dataset sources."""

205

206

def load(self, dst_path=None):

207

"""

208

Load dataset from source.

209

210

Parameters:

211

- dst_path: str, optional - Destination path for loading

212

213

Returns:

214

Loaded dataset content

215

"""

216

217

def to_json(self):

218

"""

219

Serialize source to JSON.

220

221

Returns:

222

str - JSON representation of source

223

"""

224

225

@staticmethod

226

def from_json(source_json):

227

"""

228

Reconstruct source from JSON.

229

230

Parameters:

231

- source_json: str - JSON representation

232

233

Returns:

234

DatasetSource object

235

"""

236

```

237

238

### Meta Dataset Operations

239

240

Advanced dataset operations for managing dataset relationships, transformations, and metadata.

241

242

```python { .api }

243

class MetaDataset:

244

def __init__(self, datasets, name=None, digest=None, tags=None):

245

"""

246

Meta dataset combining multiple datasets.

247

248

Parameters:

249

- datasets: list - List of Dataset objects

250

- name: str, optional - Meta dataset name

251

- digest: str, optional - Combined digest

252

- tags: dict, optional - Meta dataset tags

253

"""

254

255

def add_dataset(self, dataset, context=None):

256

"""

257

Add dataset to meta dataset.

258

259

Parameters:

260

- dataset: Dataset - Dataset to add

261

- context: str, optional - Context for dataset usage

262

"""

263

264

def get_datasets(self, context=None):

265

"""

266

Get datasets by context.

267

268

Parameters:

269

- context: str, optional - Filter by context

270

271

Returns:

272

List of Dataset objects

273

"""

274

```

275

276

## Usage Examples

277

278

### Basic Dataset Creation and Logging

279

280

```python

281

import mlflow

282

import mlflow.data

283

import pandas as pd

284

import numpy as np

285

286

# Create sample data

287

data = {

288

'feature1': np.random.randn(1000),

289

'feature2': np.random.randn(1000),

290

'feature3': np.random.randn(1000),

291

'target': np.random.randint(0, 2, 1000)

292

}

293

df = pd.DataFrame(data)

294

295

# Create dataset from pandas DataFrame

296

dataset = mlflow.data.from_pandas(

297

df=df,

298

targets="target",

299

name="classification_dataset"

300

)

301

302

# Log dataset to MLflow run

303

with mlflow.start_run():

304

# Log as input dataset

305

mlflow.data.log_input(dataset, context="training")

306

307

# Train model and log metrics

308

mlflow.log_metric("dataset_size", len(df))

309

mlflow.log_metric("num_features", len(df.columns) - 1)

310

311

print(f"Dataset name: {dataset.name}")

312

print(f"Dataset source: {dataset.source}")

313

print(f"Dataset digest: {dataset.digest}")

314

```

315

316

### Working with Different Data Sources

317

318

```python

319

import mlflow.data

320

from pyspark.sql import SparkSession

321

322

# Pandas DataFrame

323

pandas_df = pd.read_csv("data/train.csv")

324

pandas_dataset = mlflow.data.from_pandas(

325

df=pandas_df,

326

targets="target",

327

name="pandas_training_data"

328

)

329

330

# Numpy arrays

331

X = np.random.randn(1000, 10)

332

y = np.random.randint(0, 2, 1000)

333

numpy_dataset = mlflow.data.from_numpy(

334

features=X,

335

targets=y,

336

name="numpy_training_data"

337

)

338

339

# Spark DataFrame

340

spark = SparkSession.builder.getOrCreate()

341

spark_df = spark.read.format("delta").load("/path/to/delta/table")

342

spark_dataset = mlflow.data.from_spark(

343

df=spark_df,

344

targets="target",

345

name="spark_training_data"

346

)

347

348

# Delta table with time travel

349

delta_dataset = mlflow.data.from_delta(

350

table_name="ml_datasets.training_data",

351

version=5, # Specific version

352

targets="target",

353

name="delta_training_data_v5"

354

)

355

356

# Hugging Face dataset

357

hf_dataset = mlflow.data.from_huggingface(

358

path="imdb",

359

targets="label",

360

name="imdb_sentiment_data"

361

)

362

363

# Log multiple datasets

364

with mlflow.start_run():

365

mlflow.data.log_input(pandas_dataset, context="training")

366

mlflow.data.log_input(numpy_dataset, context="validation")

367

mlflow.data.log_input(spark_dataset, context="testing")

368

```

369

370

### Dataset Versioning and Lineage

371

372

```python

373

import mlflow

374

import mlflow.data

375

376

# Original dataset

377

original_df = pd.read_csv("raw_data.csv")

378

original_dataset = mlflow.data.from_pandas(

379

df=original_df,

380

name="raw_customer_data",

381

path="/data/raw/customers.csv"

382

)

383

384

# Preprocessed dataset

385

def preprocess_data(df):

386

# Data cleaning and feature engineering

387

df_clean = df.dropna()

388

df_clean['feature_engineered'] = df_clean['feature1'] * df_clean['feature2']

389

return df_clean

390

391

preprocessed_df = preprocess_data(original_df)

392

preprocessed_dataset = mlflow.data.from_pandas(

393

df=preprocessed_df,

394

name="preprocessed_customer_data",

395

path="/data/processed/customers.csv"

396

)

397

398

# Training split

399

train_df = preprocessed_df.sample(frac=0.8, random_state=42)

400

train_dataset = mlflow.data.from_pandas(

401

df=train_df,

402

targets="target",

403

name="training_split"

404

)

405

406

# Validation split

407

val_df = preprocessed_df.drop(train_df.index)

408

val_dataset = mlflow.data.from_pandas(

409

df=val_df,

410

targets="target",

411

name="validation_split"

412

)

413

414

# Log complete data lineage

415

with mlflow.start_run() as run:

416

# Log all datasets with context

417

mlflow.data.log_input(original_dataset, context="raw")

418

mlflow.data.log_input(preprocessed_dataset, context="processed")

419

mlflow.data.log_input(train_dataset, context="training")

420

mlflow.data.log_input(val_dataset, context="validation")

421

422

# Log preprocessing parameters

423

mlflow.log_param("preprocessing_steps", "dropna,feature_engineering")

424

mlflow.log_param("train_size", len(train_df))

425

mlflow.log_param("val_size", len(val_df))

426

427

# Dataset digests for reproducibility

428

print(f"Training data digest: {train_dataset.digest}")

429

print(f"Validation data digest: {val_dataset.digest}")

430

```

431

432

### Custom Dataset Sources

433

434

```python

435

import mlflow.data

436

from mlflow.data.dataset_source import DatasetSource

437

438

class S3DatasetSource(DatasetSource):

439

"""Custom dataset source for S3 data."""

440

441

def __init__(self, s3_path, credentials=None):

442

self.s3_path = s3_path

443

self.credentials = credentials

444

445

def load(self, dst_path=None):

446

"""Load dataset from S3."""

447

import boto3

448

import pandas as pd

449

450

s3 = boto3.client('s3')

451

# Implementation for loading from S3

452

return pd.read_csv(self.s3_path)

453

454

def to_json(self):

455

"""Serialize to JSON."""

456

return {

457

"s3_path": self.s3_path,

458

"source_type": "s3"

459

}

460

461

@staticmethod

462

def from_json(source_json):

463

"""Deserialize from JSON."""

464

return S3DatasetSource(source_json["s3_path"])

465

466

# Create dataset with custom source

467

s3_source = S3DatasetSource("s3://my-bucket/data/train.csv")

468

s3_dataset = mlflow.data.Dataset(

469

source=s3_source,

470

name="s3_training_data"

471

)

472

473

# Register custom source

474

mlflow.data.dataset_source_registry.register_source(S3DatasetSource)

475

```

476

477

### Meta Dataset Operations

478

479

```python

480

import mlflow.data

481

482

# Create individual datasets

483

train_dataset = mlflow.data.from_pandas(train_df, targets="target", name="train")

484

val_dataset = mlflow.data.from_pandas(val_df, targets="target", name="validation")

485

test_dataset = mlflow.data.from_pandas(test_df, targets="target", name="test")

486

487

# Create meta dataset

488

meta_dataset = mlflow.data.MetaDataset(

489

datasets=[train_dataset, val_dataset, test_dataset],

490

name="complete_ml_dataset",

491

tags={"project": "customer_classification", "version": "v2"}

492

)

493

494

# Add context to datasets

495

meta_dataset.add_dataset(train_dataset, context="training")

496

meta_dataset.add_dataset(val_dataset, context="validation")

497

meta_dataset.add_dataset(test_dataset, context="testing")

498

499

# Log meta dataset

500

with mlflow.start_run():

501

mlflow.data.log_input(meta_dataset, context="complete_pipeline")

502

503

# Access datasets by context

504

training_datasets = meta_dataset.get_datasets(context="training")

505

validation_datasets = meta_dataset.get_datasets(context="validation")

506

507

print(f"Training datasets: {len(training_datasets)}")

508

print(f"Validation datasets: {len(validation_datasets)}")

509

```

510

511

### Dataset Schema Validation

512

513

```python

514

import mlflow.data

515

from mlflow.models import infer_signature

516

517

# Create dataset with schema validation

518

df = pd.DataFrame({

519

'feature1': [1.0, 2.0, 3.0],

520

'feature2': ['A', 'B', 'C'],

521

'target': [0, 1, 0]

522

})

523

524

dataset = mlflow.data.from_pandas(

525

df=df,

526

targets="target",

527

name="schema_validated_data"

528

)

529

530

# Infer and validate schema

531

expected_schema = infer_signature(df.drop('target', axis=1), df['target'])

532

533

# Validate new data against schema

534

new_df = pd.DataFrame({

535

'feature1': [4.0, 5.0],

536

'feature2': ['D', 'E'],

537

'target': [1, 0]

538

})

539

540

try:

541

from mlflow.models.utils import validate_schema

542

validate_schema(new_df.drop('target', axis=1), expected_schema.inputs)

543

print("Schema validation passed")

544

except Exception as e:

545

print(f"Schema validation failed: {e}")

546

547

# Log with schema validation

548

with mlflow.start_run():

549

mlflow.data.log_input(dataset, context="training")

550

mlflow.log_param("schema_validation", "enabled")

551

```

552

553

## Types

554

555

```python { .api }

556

from mlflow.data.dataset import Dataset

557

from mlflow.data.dataset_source import DatasetSource

558

from mlflow.entities import Dataset as DatasetEntity, DatasetInput

559

from mlflow.data.meta_dataset import MetaDataset

560

561

class Dataset:

562

df: Any

563

source: DatasetSource

564

targets: Optional[str]

565

name: Optional[str]

566

digest: Optional[str]

567

path: Optional[str]

568

569

class DatasetSource:

570

def load(self, dst_path: Optional[str] = None) -> Any: ...

571

def to_json(self) -> str: ...

572

573

@staticmethod

574

def from_json(source_json: str) -> 'DatasetSource': ...

575

576

class DatasetEntity:

577

name: str

578

digest: str

579

source_type: str

580

source: str

581

schema: Optional[str]

582

profile: Optional[str]

583

584

class DatasetInput:

585

dataset: DatasetEntity

586

tags: List[InputTag]

587

588

class MetaDataset:

589

datasets: List[Dataset]

590

name: Optional[str]

591

digest: Optional[str]

592

tags: Optional[Dict[str, str]]

593

594

# Built-in dataset source types

595

class PandasDatasetSource(DatasetSource):

596

path: Optional[str]

597

598

class NumpyDatasetSource(DatasetSource):

599

path: Optional[str]

600

601

class SparkDatasetSource(DatasetSource):

602

path: Optional[str]

603

604

class DeltaDatasetSource(DatasetSource):

605

table_name: str

606

version: Optional[int]

607

timestamp: Optional[str]

608

609

class HuggingFaceDatasetSource(DatasetSource):

610

path: str

611

revision: Optional[str]

612

```