0
# Data Processing
1
2
Ray Data provides distributed data processing capabilities for ML workloads. It offers scalable dataset operations, transformations, and integrations with ML frameworks and storage systems.
3
4
## Capabilities
5
6
### Dataset Creation
7
8
Create datasets from various data sources.
9
10
```python { .api }
11
def read_parquet(paths, *, filesystem=None, columns=None, **kwargs):
12
"""
13
Read Parquet files into a Dataset.
14
15
Args:
16
paths (str/list): Path(s) to Parquet files
17
filesystem: Filesystem to use
18
columns (list, optional): Columns to read
19
20
Returns:
21
Dataset: Ray Dataset
22
"""
23
24
def read_csv(paths, *, filesystem=None, **kwargs):
25
"""
26
Read CSV files into a Dataset.
27
28
Args:
29
paths (str/list): Path(s) to CSV files
30
filesystem: Filesystem to use
31
**kwargs: Additional CSV reading options
32
33
Returns:
34
Dataset: Ray Dataset
35
"""
36
37
def read_json(paths, *, filesystem=None, **kwargs):
38
"""
39
Read JSON files into a Dataset.
40
41
Args:
42
paths (str/list): Path(s) to JSON files
43
filesystem: Filesystem to use
44
**kwargs: Additional JSON reading options
45
46
Returns:
47
Dataset: Ray Dataset
48
"""
49
50
def read_text(paths, *, encoding="utf-8", **kwargs):
51
"""
52
Read text files into a Dataset.
53
54
Args:
55
paths (str/list): Path(s) to text files
56
encoding (str): Text encoding
57
58
Returns:
59
Dataset: Ray Dataset
60
"""
61
62
def read_binary_files(paths, *, include_paths=False, **kwargs):
63
"""
64
Read binary files into a Dataset.
65
66
Args:
67
paths (str/list): Path(s) to binary files
68
include_paths (bool): Include file paths in output
69
70
Returns:
71
Dataset: Ray Dataset
72
"""
73
74
def read_images(paths, *, mode="RGB", **kwargs):
75
"""
76
Read image files into a Dataset.
77
78
Args:
79
paths (str/list): Path(s) to image files
80
mode (str): Image mode (RGB, RGBA, etc.)
81
82
Returns:
83
Dataset: Ray Dataset
84
"""
85
86
def from_items(items, *, parallelism=None):
87
"""
88
Create Dataset from list of items.
89
90
Args:
91
items (list): List of items
92
parallelism (int, optional): Parallelism level
93
94
Returns:
95
Dataset: Ray Dataset
96
"""
97
98
def read_bigquery(query, **kwargs):
99
"""
100
Read from BigQuery into a Dataset.
101
102
Args:
103
query (str): BigQuery SQL query
104
**kwargs: Additional BigQuery options
105
106
Returns:
107
Dataset: Ray Dataset
108
"""
109
110
def read_databricks_tables(table, **kwargs):
111
"""
112
Read Databricks tables into a Dataset.
113
114
Args:
115
table (str): Databricks table name
116
**kwargs: Additional Databricks options
117
118
Returns:
119
Dataset: Ray Dataset
120
"""
121
122
def read_delta(table_uri, **kwargs):
123
"""
124
Read Delta Lake table into a Dataset.
125
126
Args:
127
table_uri (str): Delta table URI
128
**kwargs: Additional Delta Lake options
129
130
Returns:
131
Dataset: Ray Dataset
132
"""
133
134
def read_hudi(table_uri, **kwargs):
135
"""
136
Read Apache Hudi table into a Dataset.
137
138
Args:
139
table_uri (str): Hudi table URI
140
**kwargs: Additional Hudi options
141
142
Returns:
143
Dataset: Ray Dataset
144
"""
145
146
def read_iceberg(table_uri, **kwargs):
147
"""
148
Read Apache Iceberg table into a Dataset.
149
150
Args:
151
table_uri (str): Iceberg table URI
152
**kwargs: Additional Iceberg options
153
154
Returns:
155
Dataset: Ray Dataset
156
"""
157
158
def read_mongo(uri, database, collection, **kwargs):
159
"""
160
Read from MongoDB into a Dataset.
161
162
Args:
163
uri (str): MongoDB connection URI
164
database (str): Database name
165
collection (str): Collection name
166
**kwargs: Additional MongoDB options
167
168
Returns:
169
Dataset: Ray Dataset
170
"""
171
172
def read_snowflake(query, connection_params, **kwargs):
173
"""
174
Read from Snowflake into a Dataset.
175
176
Args:
177
query (str): Snowflake SQL query
178
connection_params (dict): Connection parameters
179
**kwargs: Additional Snowflake options
180
181
Returns:
182
Dataset: Ray Dataset
183
"""
184
185
def read_tfrecords(paths, **kwargs):
186
"""
187
Read TensorFlow Records into a Dataset.
188
189
Args:
190
paths (str/list): Path(s) to TFRecord files
191
**kwargs: Additional TFRecord options
192
193
Returns:
194
Dataset: Ray Dataset
195
"""
196
197
def read_avro(paths, **kwargs):
198
"""
199
Read Apache Avro files into a Dataset.
200
201
Args:
202
paths (str/list): Path(s) to Avro files
203
**kwargs: Additional Avro options
204
205
Returns:
206
Dataset: Ray Dataset
207
"""
208
209
def read_lance(uri, **kwargs):
210
"""
211
Read Lance columnar format into a Dataset.
212
213
Args:
214
uri (str): Lance dataset URI
215
**kwargs: Additional Lance options
216
217
Returns:
218
Dataset: Ray Dataset
219
"""
220
221
def read_audio(paths, **kwargs):
222
"""
223
Read audio files into a Dataset.
224
225
Args:
226
paths (str/list): Path(s) to audio files
227
**kwargs: Additional audio processing options
228
229
Returns:
230
Dataset: Ray Dataset
231
"""
232
233
def read_videos(paths, **kwargs):
234
"""
235
Read video files into a Dataset.
236
237
Args:
238
paths (str/list): Path(s) to video files
239
**kwargs: Additional video processing options
240
241
Returns:
242
Dataset: Ray Dataset
243
"""
244
245
def from_huggingface(dataset, **kwargs):
246
"""
247
Create Dataset from HuggingFace dataset.
248
249
Args:
250
dataset: HuggingFace dataset object
251
**kwargs: Additional conversion options
252
253
Returns:
254
Dataset: Ray Dataset
255
"""
256
257
def range(n, *, parallelism=None):
258
"""
259
Create Dataset from range of integers.
260
261
Args:
262
n (int): Upper bound (exclusive)
263
parallelism (int, optional): Parallelism level
264
265
Returns:
266
Dataset: Ray Dataset
267
"""
268
269
def range_tensor(n, *, shape=(), dtype="float32", **kwargs):
270
"""
271
Create Dataset of tensors from range.
272
273
Args:
274
n (int): Number of tensors
275
shape (tuple): Tensor shape
276
dtype (str): Tensor data type
277
278
Returns:
279
Dataset: Ray Dataset
280
"""
281
```
282
283
### Dataset Transformations
284
285
Transform and process dataset contents.
286
287
```python { .api }
288
class Dataset:
289
"""Ray Dataset for distributed data processing."""
290
291
def map(self, fn, *, compute=None, **kwargs):
292
"""
293
Apply function to each row.
294
295
Args:
296
fn: Function to apply
297
compute (str, optional): Compute strategy
298
299
Returns:
300
Dataset: Transformed dataset
301
"""
302
303
def map_batches(self, fn, *, batch_size=None, **kwargs):
304
"""
305
Apply function to batches of rows.
306
307
Args:
308
fn: Function to apply to batches
309
batch_size (int, optional): Batch size
310
311
Returns:
312
Dataset: Transformed dataset
313
"""
314
315
def flat_map(self, fn, *, compute=None, **kwargs):
316
"""
317
Apply function and flatten results.
318
319
Args:
320
fn: Function that returns iterable
321
compute (str, optional): Compute strategy
322
323
Returns:
324
Dataset: Transformed dataset
325
"""
326
327
def filter(self, fn, *, compute=None):
328
"""
329
Filter rows using predicate function.
330
331
Args:
332
fn: Predicate function
333
compute (str, optional): Compute strategy
334
335
Returns:
336
Dataset: Filtered dataset
337
"""
338
339
def repartition(self, num_blocks, *, shuffle=False):
340
"""
341
Repartition dataset into specified number of blocks.
342
343
Args:
344
num_blocks (int): Number of blocks
345
shuffle (bool): Whether to shuffle data
346
347
Returns:
348
Dataset: Repartitioned dataset
349
"""
350
351
def random_shuffle(self, *, seed=None, num_blocks=None):
352
"""
353
Randomly shuffle dataset rows.
354
355
Args:
356
seed (int, optional): Random seed
357
num_blocks (int, optional): Number of blocks
358
359
Returns:
360
Dataset: Shuffled dataset
361
"""
362
363
def sort(self, key=None, *, descending=False):
364
"""
365
Sort dataset by key function.
366
367
Args:
368
key: Key function or column name
369
descending (bool): Sort in descending order
370
371
Returns:
372
Dataset: Sorted dataset
373
"""
374
375
def groupby(self, key):
376
"""
377
Group dataset by key function.
378
379
Args:
380
key: Key function or column name
381
382
Returns:
383
GroupedDataset: Grouped dataset
384
"""
385
386
def union(self, *other):
387
"""
388
Union with other datasets.
389
390
Args:
391
*other: Other datasets to union with
392
393
Returns:
394
Dataset: Union dataset
395
"""
396
397
def zip(self, other):
398
"""
399
Zip with another dataset.
400
401
Args:
402
other (Dataset): Dataset to zip with
403
404
Returns:
405
Dataset: Zipped dataset
406
"""
407
408
def add_column(self, col, fn, *, compute=None):
409
"""
410
Add new column to dataset.
411
412
Args:
413
col (str): Column name
414
fn: Function to compute column values
415
compute (str, optional): Compute strategy
416
417
Returns:
418
Dataset: Dataset with new column
419
"""
420
421
def drop_columns(self, cols):
422
"""
423
Drop columns from dataset.
424
425
Args:
426
cols (list): Column names to drop
427
428
Returns:
429
Dataset: Dataset with columns dropped
430
"""
431
432
def select_columns(self, cols):
433
"""
434
Select specific columns from dataset.
435
436
Args:
437
cols (list): Column names to select
438
439
Returns:
440
Dataset: Dataset with selected columns
441
"""
442
443
def rename_columns(self, columns):
444
"""
445
Rename dataset columns.
446
447
Args:
448
columns (dict): Mapping of old to new column names
449
450
Returns:
451
Dataset: Dataset with renamed columns
452
"""
453
454
def repartition(self, num_blocks, *, shuffle=False):
455
"""
456
Repartition dataset into specified number of blocks.
457
458
Args:
459
num_blocks (int): Target number of blocks
460
shuffle (bool): Whether to shuffle data
461
462
Returns:
463
Dataset: Repartitioned dataset
464
"""
465
```
466
467
### Dataset I/O and Persistence
468
469
Save datasets and convert to other formats.
470
471
```python { .api }
472
class Dataset:
473
def write_parquet(self, path, *, filesystem=None, **kwargs):
474
"""
475
Write dataset to Parquet files.
476
477
Args:
478
path (str): Output path
479
filesystem: Filesystem to use
480
"""
481
482
def write_csv(self, path, *, filesystem=None, **kwargs):
483
"""
484
Write dataset to CSV files.
485
486
Args:
487
path (str): Output path
488
filesystem: Filesystem to use
489
"""
490
491
def write_json(self, path, *, filesystem=None, **kwargs):
492
"""
493
Write dataset to JSON files.
494
495
Args:
496
path (str): Output path
497
filesystem: Filesystem to use
498
"""
499
500
def to_torch(self, *, label_column=None, feature_columns=None,
501
batch_size=1, **kwargs):
502
"""
503
Convert to PyTorch IterableDataset.
504
505
Args:
506
label_column (str, optional): Label column name
507
feature_columns (list, optional): Feature column names
508
batch_size (int): Batch size
509
510
Returns:
511
TorchIterableDataset: PyTorch dataset
512
"""
513
514
def to_tf(self, *, label_column=None, feature_columns=None,
515
batch_size=1, **kwargs):
516
"""
517
Convert to TensorFlow Dataset.
518
519
Args:
520
label_column (str, optional): Label column name
521
feature_columns (list, optional): Feature column names
522
batch_size (int): Batch size
523
524
Returns:
525
tf.data.Dataset: TensorFlow dataset
526
"""
527
528
def to_pandas(self, *, limit=None):
529
"""
530
Convert to Pandas DataFrame.
531
532
Args:
533
limit (int, optional): Row limit
534
535
Returns:
536
pandas.DataFrame: Pandas DataFrame
537
"""
538
539
def to_arrow(self):
540
"""
541
Convert to PyArrow Table.
542
543
Returns:
544
pyarrow.Table: Arrow table
545
"""
546
547
def iter_rows(self, *, prefetch_blocks=0):
548
"""
549
Iterate over dataset rows.
550
551
Args:
552
prefetch_blocks (int): Number of blocks to prefetch
553
554
Yields:
555
Row data
556
"""
557
558
def iter_batches(self, *, batch_size=None, prefetch_blocks=0):
559
"""
560
Iterate over dataset batches.
561
562
Args:
563
batch_size (int, optional): Batch size
564
prefetch_blocks (int): Number of blocks to prefetch
565
566
Yields:
567
Batch data
568
"""
569
```
570
571
### Dataset Information and Statistics
572
573
Get information about datasets.
574
575
```python { .api }
576
class Dataset:
577
def count(self):
578
"""
579
Count total number of rows.
580
581
Returns:
582
int: Row count
583
"""
584
585
def schema(self):
586
"""
587
Get dataset schema.
588
589
Returns:
590
Schema: Dataset schema
591
"""
592
593
def columns(self):
594
"""
595
Get column names.
596
597
Returns:
598
list: Column names
599
"""
600
601
def stats(self):
602
"""
603
Get dataset statistics.
604
605
Returns:
606
DatasetStats: Dataset statistics
607
"""
608
609
def show(self, limit=20):
610
"""
611
Display dataset contents.
612
613
Args:
614
limit (int): Number of rows to show
615
"""
616
617
def take(self, limit=20):
618
"""
619
Take first N rows.
620
621
Args:
622
limit (int): Number of rows to take
623
624
Returns:
625
list: Row data
626
"""
627
628
def take_batch(self, batch_size=20):
629
"""
630
Take first batch.
631
632
Args:
633
batch_size (int): Batch size
634
635
Returns:
636
Batch data
637
"""
638
```
639
640
### Grouped Dataset Operations
641
642
Operations on grouped datasets.
643
644
```python { .api }
645
class GroupedDataset:
646
"""Grouped dataset for aggregation operations."""
647
648
def count(self):
649
"""
650
Count rows in each group.
651
652
Returns:
653
Dataset: Dataset with group counts
654
"""
655
656
def sum(self, *columns):
657
"""
658
Sum columns in each group.
659
660
Args:
661
*columns: Columns to sum
662
663
Returns:
664
Dataset: Dataset with group sums
665
"""
666
667
def min(self, *columns):
668
"""
669
Find minimum values in each group.
670
671
Args:
672
*columns: Columns to find min for
673
674
Returns:
675
Dataset: Dataset with group minimums
676
"""
677
678
def max(self, *columns):
679
"""
680
Find maximum values in each group.
681
682
Args:
683
*columns: Columns to find max for
684
685
Returns:
686
Dataset: Dataset with group maximums
687
"""
688
689
def mean(self, *columns):
690
"""
691
Calculate mean values in each group.
692
693
Args:
694
*columns: Columns to calculate mean for
695
696
Returns:
697
Dataset: Dataset with group means
698
"""
699
700
def std(self, *columns):
701
"""
702
Calculate standard deviation in each group.
703
704
Args:
705
*columns: Columns to calculate std for
706
707
Returns:
708
Dataset: Dataset with group standard deviations
709
"""
710
```
711
712
## Usage Examples
713
714
### Basic Dataset Operations
715
716
```python
717
import ray
718
719
# Initialize Ray
720
ray.init()
721
722
# Create dataset from files
723
ds = ray.data.read_csv("s3://my-bucket/data.csv")
724
725
# Transform data
726
ds = ds.map(lambda row: {"value": row["value"] * 2})
727
728
# Filter data
729
ds = ds.filter(lambda row: row["value"] > 10)
730
731
# Convert to PyTorch
732
torch_ds = ds.to_torch(batch_size=32)
733
734
# Write results
735
ds.write_parquet("s3://my-bucket/output/")
736
```
737
738
### ML Pipeline Example
739
740
```python
741
import ray
742
743
ray.init()
744
745
# Load training data
746
train_ds = ray.data.read_parquet("train.parquet")
747
748
# Preprocess data
749
def preprocess(batch):
750
# Normalize features
751
batch["features"] = (batch["features"] - batch["features"].mean()) / batch["features"].std()
752
return batch
753
754
train_ds = train_ds.map_batches(preprocess)
755
756
# Split features and labels
757
train_ds = train_ds.map(lambda row: {
758
"features": row["features"],
759
"label": row["target"]
760
})
761
762
# Convert to PyTorch for training
763
torch_ds = train_ds.to_torch(
764
label_column="label",
765
feature_columns=["features"],
766
batch_size=64
767
)
768
```
769
770
### Data Analysis Example
771
772
```python
773
import ray
774
775
ray.init()
776
777
# Load dataset
778
ds = ray.data.read_json("events.json")
779
780
# Group by category and aggregate
781
grouped = ds.groupby("category")
782
stats = grouped.count()
783
stats.show()
784
785
# Calculate statistics
786
print(f"Total rows: {ds.count()}")
787
print(f"Schema: {ds.schema()}")
788
ds.stats()
789
```