0
# Stream Processing Operations
1
2
This document covers the detailed stream transformation operations available in Ray Streaming, including transformation functions, partitioning strategies, and advanced stream processing patterns.
3
4
## Overview
5
6
Ray Streaming provides a comprehensive set of stream transformation operations:
7
- **Element Transformations**: map, flat_map, filter
8
- **Keyed Operations**: key_by, reduce
9
- **Stream Composition**: union
10
- **Partitioning**: broadcast, partition_by, forward
11
- **Output Operations**: sink
12
- **Function Interfaces**: MapFunction, FilterFunction, ReduceFunction, etc.
13
14
## Function Interfaces
15
16
Ray Streaming defines function interfaces for type-safe and extensible stream processing.
17
18
### Core Function Interface
19
20
```python { .api }
21
from ray.streaming.function import Function
22
23
class Function:
24
def open(self, runtime_context) -> None
25
def close(self) -> None
26
def save_checkpoint(self) -> object
27
def load_checkpoint(self, checkpoint_obj) -> None
28
```
29
30
### Transformation Function Interfaces
31
32
```python { .api }
33
# Map function interface
34
class MapFunction(Function):
35
def map(self, value) -> object
36
37
# FlatMap function interface
38
class FlatMapFunction(Function):
39
def flat_map(self, value, collector) -> None
40
41
# Filter function interface
42
class FilterFunction(Function):
43
def filter(self, value) -> bool
44
45
# Key extraction function interface
46
class KeyFunction(Function):
47
def key_by(self, value) -> object
48
49
# Reduce function interface
50
class ReduceFunction(Function):
51
def reduce(self, old_value, new_value) -> object
52
53
# Sink function interface
54
class SinkFunction(Function):
55
def sink(self, value) -> None
56
```
57
58
## Element Transformations
59
60
### Map Operation
61
62
Transform each element in the stream using a one-to-one function.
63
64
```python { .api }
65
def map(self, func) -> DataStream:
66
"""
67
Apply a function to each element in the stream.
68
69
Args:
70
func: Function or MapFunction instance for transformation
71
72
Returns:
73
New DataStream with transformed elements
74
"""
75
```
76
77
#### Usage Examples
78
79
```python
80
from ray.streaming import StreamingContext
81
from ray.streaming.function import MapFunction
82
83
# Using lambda function
84
ctx = StreamingContext.Builder().build()
85
ctx.from_collection([1, 2, 3, 4, 5]) \
86
.map(lambda x: x * 2) \
87
.sink(lambda x: print(f"Doubled: {x}"))
88
89
# Using custom MapFunction
90
class SquareMapFunction(MapFunction):
91
def map(self, value):
92
return value ** 2
93
94
ctx.from_collection([1, 2, 3, 4, 5]) \
95
.map(SquareMapFunction()) \
96
.sink(lambda x: print(f"Squared: {x}"))
97
98
# Complex transformation
99
ctx.from_collection(["hello", "world", "ray", "streaming"]) \
100
.map(lambda word: {"word": word, "length": len(word), "upper": word.upper()}) \
101
.sink(lambda obj: print(f"Word: {obj['word']}, Length: {obj['length']}"))
102
103
ctx.submit("map_operations")
104
```
105
106
### FlatMap Operation
107
108
Transform each element into zero or more output elements.
109
110
```python { .api }
111
def flat_map(self, func) -> DataStream:
112
"""
113
Transform each element into multiple output elements.
114
115
Args:
116
func: Function or FlatMapFunction that returns iterable
117
118
Returns:
119
New DataStream with flattened results
120
"""
121
```
122
123
#### Usage Examples
124
125
```python
126
from ray.streaming.function import FlatMapFunction
127
128
# Split sentences into words
129
ctx.from_values("hello world", "ray streaming", "distributed computing") \
130
.flat_map(lambda sentence: sentence.split()) \
131
.map(lambda word: word.upper()) \
132
.sink(lambda word: print(f"Word: {word}"))
133
134
# Using custom FlatMapFunction
135
class TokenizeFlatMapFunction(FlatMapFunction):
136
def flat_map(self, value, collector):
137
words = value.split()
138
for word in words:
139
if len(word) > 3: # Only emit words longer than 3 characters
140
collector.collect(word.lower())
141
142
ctx.from_values("The quick brown fox jumps over the lazy dog") \
143
.flat_map(TokenizeFlatMapFunction()) \
144
.sink(print)
145
146
# Generate multiple outputs per input
147
ctx.from_collection([1, 2, 3]) \
148
.flat_map(lambda x: [x, x*2, x*3]) \
149
.sink(lambda x: print(f"Generated: {x}"))
150
151
ctx.submit("flatmap_operations")
152
```
153
154
### Filter Operation
155
156
Keep only elements that satisfy a predicate condition.
157
158
```python { .api }
159
def filter(self, func) -> DataStream:
160
"""
161
Filter elements based on a predicate function.
162
163
Args:
164
func: Function or FilterFunction returning boolean
165
166
Returns:
167
New DataStream with filtered elements
168
"""
169
```
170
171
#### Usage Examples
172
173
```python
174
from ray.streaming.function import FilterFunction
175
176
# Simple filtering
177
ctx.from_collection(range(10)) \
178
.filter(lambda x: x % 2 == 0) \
179
.sink(lambda x: print(f"Even: {x}"))
180
181
# Using custom FilterFunction
182
class PositiveFilterFunction(FilterFunction):
183
def filter(self, value):
184
return value > 0
185
186
ctx.from_collection([-3, -1, 0, 1, 5, -2, 8]) \
187
.filter(PositiveFilterFunction()) \
188
.sink(lambda x: print(f"Positive: {x}"))
189
190
# Complex filtering with string operations
191
ctx.from_values("apple", "banana", "cherry", "date", "elderberry") \
192
.filter(lambda fruit: len(fruit) > 5 and 'e' in fruit) \
193
.sink(lambda fruit: print(f"Long fruit with 'e': {fruit}"))
194
195
ctx.submit("filter_operations")
196
```
197
198
## Keyed Operations
199
200
### Key-By Operation
201
202
Partition stream by key for stateful operations.
203
204
```python { .api }
205
def key_by(self, func) -> KeyDataStream:
206
"""
207
Partition stream by key extracted using the provided function.
208
209
Args:
210
func: Function or KeyFunction to extract key from elements
211
212
Returns:
213
KeyDataStream partitioned by the key function
214
"""
215
```
216
217
#### Usage Examples
218
219
```python
220
from ray.streaming.function import KeyFunction
221
222
# Group by key for word counting
223
ctx.from_values("apple", "banana", "apple", "cherry", "banana", "apple") \
224
.map(lambda word: (word, 1)) \
225
.key_by(lambda pair: pair[0]) \
226
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
227
.sink(lambda result: print(f"Count: {result[0]} = {result[1]}"))
228
229
# Using custom KeyFunction
230
class CategoryKeyFunction(KeyFunction):
231
def key_by(self, value):
232
# Group items by their category
233
categories = {
234
'fruits': ['apple', 'banana', 'cherry'],
235
'vegetables': ['carrot', 'broccoli', 'spinach'],
236
'grains': ['rice', 'wheat', 'oats']
237
}
238
for category, items in categories.items():
239
if value in items:
240
return category
241
return 'other'
242
243
ctx.from_values("apple", "carrot", "banana", "rice", "broccoli") \
244
.key_by(CategoryKeyFunction()) \
245
.reduce(lambda old, new: f"{old},{new}" if isinstance(old, str) else f"{old},{new}") \
246
.sink(lambda result: print(f"Category items: {result}"))
247
248
ctx.submit("keyed_operations")
249
```
250
251
### Reduce Operation
252
253
Combine elements with the same key using a reduce function.
254
255
```python { .api }
256
def reduce(self, func) -> DataStream:
257
"""
258
Reduce elements with the same key using the provided function.
259
260
Args:
261
func: Function or ReduceFunction for combining values
262
263
Returns:
264
DataStream with reduced values per key
265
"""
266
```
267
268
#### Usage Examples
269
270
```python
271
from ray.streaming.function import ReduceFunction
272
273
# Sum values by key
274
ctx.from_values(("A", 10), ("B", 5), ("A", 15), ("B", 20), ("A", 8)) \
275
.key_by(lambda pair: pair[0]) \
276
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
277
.sink(lambda result: print(f"Sum for {result[0]}: {result[1]}"))
278
279
# Using custom ReduceFunction
280
class MaxReduceFunction(ReduceFunction):
281
def reduce(self, old_value, new_value):
282
return max(old_value, new_value, key=lambda x: x[1])
283
284
ctx.from_values(("user1", 100), ("user2", 85), ("user1", 120), ("user2", 95)) \
285
.key_by(lambda pair: pair[0]) \
286
.reduce(MaxReduceFunction()) \
287
.sink(lambda result: print(f"Max score for {result[0]}: {result[1]}"))
288
289
# String aggregation
290
ctx.from_values(("group1", "a"), ("group2", "x"), ("group1", "b"), ("group2", "y")) \
291
.key_by(lambda pair: pair[0]) \
292
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
293
.sink(lambda result: print(f"Concatenated {result[0]}: {result[1]}"))
294
295
ctx.submit("reduce_operations")
296
```
297
298
## Stream Composition
299
300
### Union Operation
301
302
Merge multiple streams of the same type.
303
304
```python { .api }
305
def union(self, *streams) -> UnionStream:
306
"""
307
Union this stream with other streams.
308
309
Args:
310
*streams: DataStreams to union with this stream
311
312
Returns:
313
UnionStream containing elements from all input streams
314
"""
315
```
316
317
#### Usage Examples
318
319
```python
320
# Union multiple data sources
321
stream1 = ctx.from_values("source1-a", "source1-b", "source1-c")
322
stream2 = ctx.from_values("source2-x", "source2-y", "source2-z")
323
stream3 = ctx.from_values("source3-1", "source3-2", "source3-3")
324
325
# Union all streams
326
unified = stream1.union(stream2, stream3) \
327
.map(lambda x: f"Unified: {x}") \
328
.sink(print)
329
330
# Union with different processing branches
331
numbers = ctx.from_collection(range(20))
332
evens = numbers.filter(lambda x: x % 2 == 0).map(lambda x: f"Even: {x}")
333
odds = numbers.filter(lambda x: x % 2 == 1).map(lambda x: f"Odd: {x}")
334
335
evens.union(odds).sink(print)
336
337
ctx.submit("union_operations")
338
```
339
340
## Partitioning Strategies
341
342
### Broadcast Partitioning
343
344
Send all elements to every parallel instance of the next operator.
345
346
```python { .api }
347
def broadcast(self) -> DataStream:
348
"""
349
Broadcast all elements to every parallel instance.
350
351
Returns:
352
DataStream with broadcast partitioning
353
"""
354
```
355
356
#### Usage Examples
357
358
```python
359
# Broadcast configuration data to all workers
360
config_stream = ctx.from_values("config1", "config2", "config3") \
361
.broadcast() \
362
.map(lambda config: f"All workers got: {config}") \
363
.sink(print)
364
365
# Broadcast lookup table
366
lookup_data = ctx.from_values(("key1", "value1"), ("key2", "value2")) \
367
.broadcast() \
368
.map(lambda pair: f"Lookup: {pair[0]} -> {pair[1]}") \
369
.sink(print)
370
371
ctx.submit("broadcast_operations")
372
```
373
374
### Custom Partitioning
375
376
Use custom partitioning logic to control data distribution.
377
378
```python { .api }
379
def partition_by(self, partition_func) -> DataStream:
380
"""
381
Partition stream using custom partitioning function.
382
383
Args:
384
partition_func: Function or Partition instance for custom partitioning
385
386
Returns:
387
DataStream with custom partitioning
388
"""
389
```
390
391
#### Usage Examples
392
393
```python
394
from ray.streaming.partition import Partition
395
396
# Hash-based partitioning
397
def hash_partition(element):
398
return hash(str(element)) % 4
399
400
ctx.from_collection(range(20)) \
401
.partition_by(hash_partition) \
402
.map(lambda x: f"Partitioned: {x}") \
403
.sink(print)
404
405
# Custom partitioning class
406
class RegionPartition(Partition):
407
def partition(self, record, num_partitions):
408
region_map = {"US": 0, "EU": 1, "ASIA": 2}
409
region = record.get("region", "OTHER")
410
return region_map.get(region, 3) % num_partitions
411
412
regions_data = [
413
{"id": 1, "region": "US", "data": "user1"},
414
{"id": 2, "region": "EU", "data": "user2"},
415
{"id": 3, "region": "ASIA", "data": "user3"}
416
]
417
418
ctx.from_collection(regions_data) \
419
.partition_by(RegionPartition()) \
420
.map(lambda record: f"Region {record['region']}: {record['data']}") \
421
.sink(print)
422
423
ctx.submit("partition_operations")
424
```
425
426
### Forward Partitioning
427
428
Forward elements locally to the next operator.
429
430
```python { .api }
431
def forward(self) -> DataStream:
432
"""
433
Forward elements locally to avoid network transfer.
434
435
Returns:
436
DataStream with forward partitioning
437
"""
438
```
439
440
## Output Operations
441
442
### Sink Operation
443
444
Define output behavior for stream processing results.
445
446
```python { .api }
447
def sink(self, func) -> StreamSink:
448
"""
449
Create a sink for the stream.
450
451
Args:
452
func: Function or SinkFunction for handling output
453
454
Returns:
455
StreamSink representing the output operation
456
"""
457
```
458
459
#### Usage Examples
460
461
```python
462
from ray.streaming.function import SinkFunction
463
464
# Simple sink with lambda
465
ctx.from_collection([1, 2, 3, 4, 5]) \
466
.map(lambda x: x ** 2) \
467
.sink(lambda x: print(f"Result: {x}"))
468
469
# Custom SinkFunction
470
class FileSinkFunction(SinkFunction):
471
def __init__(self, filename):
472
self.filename = filename
473
self.file = None
474
475
def open(self, runtime_context):
476
self.file = open(self.filename, 'w')
477
478
def sink(self, value):
479
self.file.write(f"{value}\n")
480
self.file.flush()
481
482
def close(self):
483
if self.file:
484
self.file.close()
485
486
ctx.from_values("line1", "line2", "line3") \
487
.sink(FileSinkFunction("output.txt"))
488
489
# Database sink simulation
490
class DatabaseSinkFunction(SinkFunction):
491
def open(self, runtime_context):
492
print("Connecting to database...")
493
self.connection = "mock_db_connection"
494
495
def sink(self, value):
496
print(f"INSERT INTO results VALUES ('{value}')")
497
498
def close(self):
499
print("Closing database connection")
500
501
ctx.from_collection(range(5)) \
502
.map(lambda x: f"record_{x}") \
503
.sink(DatabaseSinkFunction())
504
505
ctx.submit("sink_operations")
506
```
507
508
## Advanced Stream Patterns
509
510
### Windowing Simulation
511
512
Although Ray Streaming doesn't have built-in windowing, you can simulate time-based processing.
513
514
```python
515
import time
516
from datetime import datetime
517
518
# Time-based processing simulation
519
def timestamped_data():
520
for i in range(10):
521
yield {"timestamp": datetime.now().isoformat(), "value": i}
522
time.sleep(0.5)
523
524
class TimestampedSource(SourceFunction):
525
def fetch(self, ctx):
526
for data in timestamped_data():
527
ctx.collect(data)
528
529
ctx.from_source(TimestampedSource()) \
530
.map(lambda record: f"Time: {record['timestamp']}, Value: {record['value']}") \
531
.sink(print)
532
```
533
534
### Multi-Stream Processing
535
536
Process multiple streams with different operations.
537
538
```python
539
# Create multiple processing branches
540
source = ctx.from_collection(range(100))
541
542
# Branch 1: Even numbers processing
543
evens = source.filter(lambda x: x % 2 == 0) \
544
.map(lambda x: f"Even: {x}")
545
546
# Branch 2: Odd numbers processing
547
odds = source.filter(lambda x: x % 2 == 1) \
548
.map(lambda x: f"Odd: {x}")
549
550
# Branch 3: Multiples of 5
551
fives = source.filter(lambda x: x % 5 == 0) \
552
.map(lambda x: f"Multiple of 5: {x}")
553
554
# Combine all branches
555
evens.union(odds, fives) \
556
.sink(print)
557
```
558
559
## Performance Considerations
560
561
### Operator Chaining
562
563
Control operator chaining for performance optimization.
564
565
```python
566
# Disable chaining for better parallelism
567
stream = ctx.from_collection(large_dataset) \
568
.disable_chain() \
569
.map(expensive_transformation) \
570
.disable_chain() \
571
.filter(complex_predicate)
572
573
# Use forward for local processing
574
local_stream = ctx.from_collection(data) \
575
.forward() \
576
.map(local_transformation)
577
```
578
579
### Parallelism Configuration
580
581
Configure parallelism for different operations.
582
583
```python
584
# Different parallelism for different operations
585
ctx.from_collection(data) \
586
.set_parallelism(8) \
587
.map(cpu_intensive_function) \
588
.set_parallelism(4) \
589
.reduce(reduction_function) \
590
.set_parallelism(1) \
591
.sink(output_function)
592
```
593
594
## See Also
595
596
- [Data Streams Documentation](./data-streams.md) - Stream classes and basic transformations
597
- [Source Functions Documentation](./source-functions.md) - Custom data source implementation
598
- [Streaming Context Documentation](./streaming-context.md) - StreamingContext and job management
599
- [Cross-Language Support Documentation](./cross-language.md) - Python/Java integration details