or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-distributed.mddata-processing.mddistributed-training.mdhyperparameter-tuning.mdindex.mdmodel-serving.mdreinforcement-learning.mdutilities-advanced.md

data-processing.mddocs/

0

# Data Processing

1

2

Ray Data provides distributed data processing capabilities for ML workloads. It offers scalable dataset operations, transformations, and integrations with ML frameworks and storage systems.

3

4

## Capabilities

5

6

### Dataset Creation

7

8

Create datasets from various data sources.

9

10

```python { .api }

11

def read_parquet(paths, *, filesystem=None, columns=None, **kwargs):

12

"""

13

Read Parquet files into a Dataset.

14

15

Args:

16

paths (str/list): Path(s) to Parquet files

17

filesystem: Filesystem to use

18

columns (list, optional): Columns to read

19

20

Returns:

21

Dataset: Ray Dataset

22

"""

23

24

def read_csv(paths, *, filesystem=None, **kwargs):

25

"""

26

Read CSV files into a Dataset.

27

28

Args:

29

paths (str/list): Path(s) to CSV files

30

filesystem: Filesystem to use

31

**kwargs: Additional CSV reading options

32

33

Returns:

34

Dataset: Ray Dataset

35

"""

36

37

def read_json(paths, *, filesystem=None, **kwargs):

38

"""

39

Read JSON files into a Dataset.

40

41

Args:

42

paths (str/list): Path(s) to JSON files

43

filesystem: Filesystem to use

44

**kwargs: Additional JSON reading options

45

46

Returns:

47

Dataset: Ray Dataset

48

"""

49

50

def read_text(paths, *, encoding="utf-8", **kwargs):

51

"""

52

Read text files into a Dataset.

53

54

Args:

55

paths (str/list): Path(s) to text files

56

encoding (str): Text encoding

57

58

Returns:

59

Dataset: Ray Dataset

60

"""

61

62

def read_binary_files(paths, *, include_paths=False, **kwargs):

63

"""

64

Read binary files into a Dataset.

65

66

Args:

67

paths (str/list): Path(s) to binary files

68

include_paths (bool): Include file paths in output

69

70

Returns:

71

Dataset: Ray Dataset

72

"""

73

74

def read_images(paths, *, mode="RGB", **kwargs):

75

"""

76

Read image files into a Dataset.

77

78

Args:

79

paths (str/list): Path(s) to image files

80

mode (str): Image mode (RGB, RGBA, etc.)

81

82

Returns:

83

Dataset: Ray Dataset

84

"""

85

86

def from_items(items, *, parallelism=None):

87

"""

88

Create Dataset from list of items.

89

90

Args:

91

items (list): List of items

92

parallelism (int, optional): Parallelism level

93

94

Returns:

95

Dataset: Ray Dataset

96

"""

97

98

def read_bigquery(query, **kwargs):

99

"""

100

Read from BigQuery into a Dataset.

101

102

Args:

103

query (str): BigQuery SQL query

104

**kwargs: Additional BigQuery options

105

106

Returns:

107

Dataset: Ray Dataset

108

"""

109

110

def read_databricks_tables(table, **kwargs):

111

"""

112

Read Databricks tables into a Dataset.

113

114

Args:

115

table (str): Databricks table name

116

**kwargs: Additional Databricks options

117

118

Returns:

119

Dataset: Ray Dataset

120

"""

121

122

def read_delta(table_uri, **kwargs):

123

"""

124

Read Delta Lake table into a Dataset.

125

126

Args:

127

table_uri (str): Delta table URI

128

**kwargs: Additional Delta Lake options

129

130

Returns:

131

Dataset: Ray Dataset

132

"""

133

134

def read_hudi(table_uri, **kwargs):

135

"""

136

Read Apache Hudi table into a Dataset.

137

138

Args:

139

table_uri (str): Hudi table URI

140

**kwargs: Additional Hudi options

141

142

Returns:

143

Dataset: Ray Dataset

144

"""

145

146

def read_iceberg(table_uri, **kwargs):

147

"""

148

Read Apache Iceberg table into a Dataset.

149

150

Args:

151

table_uri (str): Iceberg table URI

152

**kwargs: Additional Iceberg options

153

154

Returns:

155

Dataset: Ray Dataset

156

"""

157

158

def read_mongo(uri, database, collection, **kwargs):

159

"""

160

Read from MongoDB into a Dataset.

161

162

Args:

163

uri (str): MongoDB connection URI

164

database (str): Database name

165

collection (str): Collection name

166

**kwargs: Additional MongoDB options

167

168

Returns:

169

Dataset: Ray Dataset

170

"""

171

172

def read_snowflake(query, connection_params, **kwargs):

173

"""

174

Read from Snowflake into a Dataset.

175

176

Args:

177

query (str): Snowflake SQL query

178

connection_params (dict): Connection parameters

179

**kwargs: Additional Snowflake options

180

181

Returns:

182

Dataset: Ray Dataset

183

"""

184

185

def read_tfrecords(paths, **kwargs):

186

"""

187

Read TensorFlow Records into a Dataset.

188

189

Args:

190

paths (str/list): Path(s) to TFRecord files

191

**kwargs: Additional TFRecord options

192

193

Returns:

194

Dataset: Ray Dataset

195

"""

196

197

def read_avro(paths, **kwargs):

198

"""

199

Read Apache Avro files into a Dataset.

200

201

Args:

202

paths (str/list): Path(s) to Avro files

203

**kwargs: Additional Avro options

204

205

Returns:

206

Dataset: Ray Dataset

207

"""

208

209

def read_lance(uri, **kwargs):

210

"""

211

Read Lance columnar format into a Dataset.

212

213

Args:

214

uri (str): Lance dataset URI

215

**kwargs: Additional Lance options

216

217

Returns:

218

Dataset: Ray Dataset

219

"""

220

221

def read_audio(paths, **kwargs):

222

"""

223

Read audio files into a Dataset.

224

225

Args:

226

paths (str/list): Path(s) to audio files

227

**kwargs: Additional audio processing options

228

229

Returns:

230

Dataset: Ray Dataset

231

"""

232

233

def read_videos(paths, **kwargs):

234

"""

235

Read video files into a Dataset.

236

237

Args:

238

paths (str/list): Path(s) to video files

239

**kwargs: Additional video processing options

240

241

Returns:

242

Dataset: Ray Dataset

243

"""

244

245

def from_huggingface(dataset, **kwargs):

246

"""

247

Create Dataset from HuggingFace dataset.

248

249

Args:

250

dataset: HuggingFace dataset object

251

**kwargs: Additional conversion options

252

253

Returns:

254

Dataset: Ray Dataset

255

"""

256

257

def range(n, *, parallelism=None):

258

"""

259

Create Dataset from range of integers.

260

261

Args:

262

n (int): Upper bound (exclusive)

263

parallelism (int, optional): Parallelism level

264

265

Returns:

266

Dataset: Ray Dataset

267

"""

268

269

def range_tensor(n, *, shape=(), dtype="float32", **kwargs):

270

"""

271

Create Dataset of tensors from range.

272

273

Args:

274

n (int): Number of tensors

275

shape (tuple): Tensor shape

276

dtype (str): Tensor data type

277

278

Returns:

279

Dataset: Ray Dataset

280

"""

281

```

282

283

### Dataset Transformations

284

285

Transform and process dataset contents.

286

287

```python { .api }

288

class Dataset:

289

"""Ray Dataset for distributed data processing."""

290

291

def map(self, fn, *, compute=None, **kwargs):

292

"""

293

Apply function to each row.

294

295

Args:

296

fn: Function to apply

297

compute (str, optional): Compute strategy

298

299

Returns:

300

Dataset: Transformed dataset

301

"""

302

303

def map_batches(self, fn, *, batch_size=None, **kwargs):

304

"""

305

Apply function to batches of rows.

306

307

Args:

308

fn: Function to apply to batches

309

batch_size (int, optional): Batch size

310

311

Returns:

312

Dataset: Transformed dataset

313

"""

314

315

def flat_map(self, fn, *, compute=None, **kwargs):

316

"""

317

Apply function and flatten results.

318

319

Args:

320

fn: Function that returns iterable

321

compute (str, optional): Compute strategy

322

323

Returns:

324

Dataset: Transformed dataset

325

"""

326

327

def filter(self, fn, *, compute=None):

328

"""

329

Filter rows using predicate function.

330

331

Args:

332

fn: Predicate function

333

compute (str, optional): Compute strategy

334

335

Returns:

336

Dataset: Filtered dataset

337

"""

338

339

def repartition(self, num_blocks, *, shuffle=False):

340

"""

341

Repartition dataset into specified number of blocks.

342

343

Args:

344

num_blocks (int): Number of blocks

345

shuffle (bool): Whether to shuffle data

346

347

Returns:

348

Dataset: Repartitioned dataset

349

"""

350

351

def random_shuffle(self, *, seed=None, num_blocks=None):

352

"""

353

Randomly shuffle dataset rows.

354

355

Args:

356

seed (int, optional): Random seed

357

num_blocks (int, optional): Number of blocks

358

359

Returns:

360

Dataset: Shuffled dataset

361

"""

362

363

def sort(self, key=None, *, descending=False):

364

"""

365

Sort dataset by key function.

366

367

Args:

368

key: Key function or column name

369

descending (bool): Sort in descending order

370

371

Returns:

372

Dataset: Sorted dataset

373

"""

374

375

def groupby(self, key):

376

"""

377

Group dataset by key function.

378

379

Args:

380

key: Key function or column name

381

382

Returns:

383

GroupedDataset: Grouped dataset

384

"""

385

386

def union(self, *other):

387

"""

388

Union with other datasets.

389

390

Args:

391

*other: Other datasets to union with

392

393

Returns:

394

Dataset: Union dataset

395

"""

396

397

def zip(self, other):

398

"""

399

Zip with another dataset.

400

401

Args:

402

other (Dataset): Dataset to zip with

403

404

Returns:

405

Dataset: Zipped dataset

406

"""

407

408

def add_column(self, col, fn, *, compute=None):

409

"""

410

Add new column to dataset.

411

412

Args:

413

col (str): Column name

414

fn: Function to compute column values

415

compute (str, optional): Compute strategy

416

417

Returns:

418

Dataset: Dataset with new column

419

"""

420

421

def drop_columns(self, cols):

422

"""

423

Drop columns from dataset.

424

425

Args:

426

cols (list): Column names to drop

427

428

Returns:

429

Dataset: Dataset with columns dropped

430

"""

431

432

def select_columns(self, cols):

433

"""

434

Select specific columns from dataset.

435

436

Args:

437

cols (list): Column names to select

438

439

Returns:

440

Dataset: Dataset with selected columns

441

"""

442

443

def rename_columns(self, columns):

444

"""

445

Rename dataset columns.

446

447

Args:

448

columns (dict): Mapping of old to new column names

449

450

Returns:

451

Dataset: Dataset with renamed columns

452

"""

453

454

def repartition(self, num_blocks, *, shuffle=False):

455

"""

456

Repartition dataset into specified number of blocks.

457

458

Args:

459

num_blocks (int): Target number of blocks

460

shuffle (bool): Whether to shuffle data

461

462

Returns:

463

Dataset: Repartitioned dataset

464

"""

465

```

466

467

### Dataset I/O and Persistence

468

469

Save datasets and convert to other formats.

470

471

```python { .api }

472

class Dataset:

473

def write_parquet(self, path, *, filesystem=None, **kwargs):

474

"""

475

Write dataset to Parquet files.

476

477

Args:

478

path (str): Output path

479

filesystem: Filesystem to use

480

"""

481

482

def write_csv(self, path, *, filesystem=None, **kwargs):

483

"""

484

Write dataset to CSV files.

485

486

Args:

487

path (str): Output path

488

filesystem: Filesystem to use

489

"""

490

491

def write_json(self, path, *, filesystem=None, **kwargs):

492

"""

493

Write dataset to JSON files.

494

495

Args:

496

path (str): Output path

497

filesystem: Filesystem to use

498

"""

499

500

def to_torch(self, *, label_column=None, feature_columns=None,

501

batch_size=1, **kwargs):

502

"""

503

Convert to PyTorch IterableDataset.

504

505

Args:

506

label_column (str, optional): Label column name

507

feature_columns (list, optional): Feature column names

508

batch_size (int): Batch size

509

510

Returns:

511

TorchIterableDataset: PyTorch dataset

512

"""

513

514

def to_tf(self, *, label_column=None, feature_columns=None,

515

batch_size=1, **kwargs):

516

"""

517

Convert to TensorFlow Dataset.

518

519

Args:

520

label_column (str, optional): Label column name

521

feature_columns (list, optional): Feature column names

522

batch_size (int): Batch size

523

524

Returns:

525

tf.data.Dataset: TensorFlow dataset

526

"""

527

528

def to_pandas(self, *, limit=None):

529

"""

530

Convert to Pandas DataFrame.

531

532

Args:

533

limit (int, optional): Row limit

534

535

Returns:

536

pandas.DataFrame: Pandas DataFrame

537

"""

538

539

def to_arrow(self):

540

"""

541

Convert to PyArrow Table.

542

543

Returns:

544

pyarrow.Table: Arrow table

545

"""

546

547

def iter_rows(self, *, prefetch_blocks=0):

548

"""

549

Iterate over dataset rows.

550

551

Args:

552

prefetch_blocks (int): Number of blocks to prefetch

553

554

Yields:

555

Row data

556

"""

557

558

def iter_batches(self, *, batch_size=None, prefetch_blocks=0):

559

"""

560

Iterate over dataset batches.

561

562

Args:

563

batch_size (int, optional): Batch size

564

prefetch_blocks (int): Number of blocks to prefetch

565

566

Yields:

567

Batch data

568

"""

569

```

570

571

### Dataset Information and Statistics

572

573

Get information about datasets.

574

575

```python { .api }

576

class Dataset:

577

def count(self):

578

"""

579

Count total number of rows.

580

581

Returns:

582

int: Row count

583

"""

584

585

def schema(self):

586

"""

587

Get dataset schema.

588

589

Returns:

590

Schema: Dataset schema

591

"""

592

593

def columns(self):

594

"""

595

Get column names.

596

597

Returns:

598

list: Column names

599

"""

600

601

def stats(self):

602

"""

603

Get dataset statistics.

604

605

Returns:

606

DatasetStats: Dataset statistics

607

"""

608

609

def show(self, limit=20):

610

"""

611

Display dataset contents.

612

613

Args:

614

limit (int): Number of rows to show

615

"""

616

617

def take(self, limit=20):

618

"""

619

Take first N rows.

620

621

Args:

622

limit (int): Number of rows to take

623

624

Returns:

625

list: Row data

626

"""

627

628

def take_batch(self, batch_size=20):

629

"""

630

Take first batch.

631

632

Args:

633

batch_size (int): Batch size

634

635

Returns:

636

Batch data

637

"""

638

```

639

640

### Grouped Dataset Operations

641

642

Operations on grouped datasets.

643

644

```python { .api }

645

class GroupedDataset:

646

"""Grouped dataset for aggregation operations."""

647

648

def count(self):

649

"""

650

Count rows in each group.

651

652

Returns:

653

Dataset: Dataset with group counts

654

"""

655

656

def sum(self, *columns):

657

"""

658

Sum columns in each group.

659

660

Args:

661

*columns: Columns to sum

662

663

Returns:

664

Dataset: Dataset with group sums

665

"""

666

667

def min(self, *columns):

668

"""

669

Find minimum values in each group.

670

671

Args:

672

*columns: Columns to find min for

673

674

Returns:

675

Dataset: Dataset with group minimums

676

"""

677

678

def max(self, *columns):

679

"""

680

Find maximum values in each group.

681

682

Args:

683

*columns: Columns to find max for

684

685

Returns:

686

Dataset: Dataset with group maximums

687

"""

688

689

def mean(self, *columns):

690

"""

691

Calculate mean values in each group.

692

693

Args:

694

*columns: Columns to calculate mean for

695

696

Returns:

697

Dataset: Dataset with group means

698

"""

699

700

def std(self, *columns):

701

"""

702

Calculate standard deviation in each group.

703

704

Args:

705

*columns: Columns to calculate std for

706

707

Returns:

708

Dataset: Dataset with group standard deviations

709

"""

710

```

711

712

## Usage Examples

713

714

### Basic Dataset Operations

715

716

```python

717

import ray

718

719

# Initialize Ray

720

ray.init()

721

722

# Create dataset from files

723

ds = ray.data.read_csv("s3://my-bucket/data.csv")

724

725

# Transform data

726

ds = ds.map(lambda row: {"value": row["value"] * 2})

727

728

# Filter data

729

ds = ds.filter(lambda row: row["value"] > 10)

730

731

# Convert to PyTorch

732

torch_ds = ds.to_torch(batch_size=32)

733

734

# Write results

735

ds.write_parquet("s3://my-bucket/output/")

736

```

737

738

### ML Pipeline Example

739

740

```python

741

import ray

742

743

ray.init()

744

745

# Load training data

746

train_ds = ray.data.read_parquet("train.parquet")

747

748

# Preprocess data

749

def preprocess(batch):

750

# Normalize features

751

batch["features"] = (batch["features"] - batch["features"].mean()) / batch["features"].std()

752

return batch

753

754

train_ds = train_ds.map_batches(preprocess)

755

756

# Split features and labels

757

train_ds = train_ds.map(lambda row: {

758

"features": row["features"],

759

"label": row["target"]

760

})

761

762

# Convert to PyTorch for training

763

torch_ds = train_ds.to_torch(

764

label_column="label",

765

feature_columns=["features"],

766

batch_size=64

767

)

768

```

769

770

### Data Analysis Example

771

772

```python

773

import ray

774

775

ray.init()

776

777

# Load dataset

778

ds = ray.data.read_json("events.json")

779

780

# Group by category and aggregate

781

grouped = ds.groupby("category")

782

stats = grouped.count()

783

stats.show()

784

785

# Calculate statistics

786

print(f"Total rows: {ds.count()}")

787

print(f"Schema: {ds.schema()}")

788

ds.stats()

789

```