0
# Data Streams and Transformations
1
2
This document covers the data stream classes and transformation operations in Ray Streaming, including DataStream, KeyDataStream, and their Java counterparts for cross-language integration.
3
4
## Overview
5
6
Ray Streaming provides a fluent API for stream transformations through several stream classes:
7
- **DataStream**: Main stream class for Python-based transformations
8
- **KeyDataStream**: Keyed stream for stateful operations like reduce
9
- **StreamSource**: Entry point streams from data sources
10
- **UnionStream**: Result of union operations between multiple streams
11
- **JavaDataStream**: Java-based stream for cross-language operations
12
13
## Stream Base Class
14
15
All stream classes inherit from the abstract Stream base class.
16
17
### Core API
18
19
```python { .api }
20
from ray.streaming.datastream import Stream
21
22
class Stream:
23
def get_parallelism(self) -> int
24
def set_parallelism(self, parallelism: int) -> Stream
25
def get_id(self) -> str
26
def with_config(self, key=None, value=None, conf=None) -> Stream
27
def get_config(self) -> dict
28
def forward(self) -> Stream
29
def disable_chain(self) -> Stream
30
def get_input_stream(self) -> Stream
31
def get_streaming_context(self) -> StreamingContext
32
```
33
34
## DataStream
35
36
The main stream class for Python-based stream processing and transformations.
37
38
### Core API
39
40
```python { .api }
41
from ray.streaming.datastream import DataStream
42
43
class DataStream(Stream):
44
# Element-wise transformations
45
def map(self, func) -> DataStream
46
def flat_map(self, func) -> DataStream
47
def filter(self, func) -> DataStream
48
49
# Stream composition
50
def union(self, *streams) -> UnionStream
51
52
# Partitioning and keying
53
def key_by(self, func) -> KeyDataStream
54
def broadcast(self) -> DataStream
55
def partition_by(self, partition_func) -> DataStream
56
57
# Output operations
58
def sink(self, func) -> StreamSink
59
60
# Cross-language integration
61
def as_java_stream(self) -> JavaDataStream
62
```
63
64
## Capabilities
65
66
### Element-wise Transformations
67
68
Apply functions to individual elements in the stream.
69
70
```python { .api }
71
# Map transformation - one-to-one element transformation
72
def map(self, func) -> DataStream:
73
"""
74
Transform each element using the provided function.
75
76
Args:
77
func: Function or MapFunction instance for transformation
78
79
Returns:
80
New DataStream with transformed elements
81
"""
82
83
# FlatMap transformation - one-to-many element transformation
84
def flat_map(self, func) -> DataStream:
85
"""
86
Transform each element into zero or more output elements.
87
88
Args:
89
func: Function or FlatMapFunction that returns iterable
90
91
Returns:
92
New DataStream with flattened results
93
"""
94
95
# Filter transformation - element filtering
96
def filter(self, func) -> DataStream:
97
"""
98
Keep only elements that satisfy the predicate.
99
100
Args:
101
func: Function or FilterFunction returning boolean
102
103
Returns:
104
New DataStream with filtered elements
105
"""
106
```
107
108
### Usage Examples
109
110
```python
111
from ray.streaming import StreamingContext
112
113
ctx = StreamingContext.Builder().build()
114
115
# Map transformation
116
ctx.from_collection([1, 2, 3, 4, 5]) \
117
.map(lambda x: x * 2) \
118
.sink(lambda x: print(f"Doubled: {x}"))
119
120
# FlatMap transformation
121
ctx.from_values("hello world", "ray streaming") \
122
.flat_map(lambda line: line.split()) \
123
.map(lambda word: word.upper()) \
124
.sink(lambda x: print(f"Word: {x}"))
125
126
# Filter transformation
127
ctx.from_collection(range(10)) \
128
.filter(lambda x: x % 2 == 0) \
129
.map(lambda x: x ** 2) \
130
.sink(lambda x: print(f"Even square: {x}"))
131
```
132
133
### Stream Composition
134
135
Combine multiple streams into unified processing pipelines.
136
137
```python { .api }
138
def union(self, *streams) -> UnionStream:
139
"""
140
Merge multiple streams of the same type.
141
142
Args:
143
*streams: DataStreams to union with this stream
144
145
Returns:
146
UnionStream containing elements from all input streams
147
"""
148
```
149
150
### Usage Example
151
152
```python
153
# Union multiple streams
154
stream1 = ctx.from_values(1, 2, 3)
155
stream2 = ctx.from_values(4, 5, 6)
156
stream3 = ctx.from_values(7, 8, 9)
157
158
unified = stream1.union(stream2, stream3) \
159
.map(lambda x: x * 10) \
160
.sink(lambda x: print(f"Unified: {x}"))
161
```
162
163
### Partitioning and Keying
164
165
Control data distribution and enable stateful operations.
166
167
```python { .api }
168
def key_by(self, func) -> KeyDataStream:
169
"""
170
Partition stream by key for stateful operations.
171
172
Args:
173
func: Function or KeyFunction to extract key from elements
174
175
Returns:
176
KeyDataStream partitioned by the key function
177
"""
178
179
def broadcast(self) -> DataStream:
180
"""
181
Broadcast all elements to every parallel instance.
182
183
Returns:
184
DataStream with broadcast partitioning
185
"""
186
187
def partition_by(self, partition_func) -> DataStream:
188
"""
189
Partition stream using custom partitioning function.
190
191
Args:
192
partition_func: Function or Partition instance for custom partitioning
193
194
Returns:
195
DataStream with custom partitioning
196
"""
197
```
198
199
### Usage Examples
200
201
```python
202
# Key-based processing
203
ctx.from_values(("a", 1), ("b", 2), ("a", 3), ("b", 4)) \
204
.key_by(lambda pair: pair[0]) \
205
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
206
.sink(lambda x: print(f"Sum for {x[0]}: {x[1]}"))
207
208
# Broadcast partitioning
209
ctx.from_values("broadcast", "message") \
210
.broadcast() \
211
.map(lambda x: x.upper()) \
212
.sink(lambda x: print(f"Broadcast: {x}"))
213
214
# Custom partitioning
215
def custom_partition(element):
216
return hash(element) % 4
217
218
ctx.from_collection(range(20)) \
219
.partition_by(custom_partition) \
220
.map(lambda x: f"Partition-{x}") \
221
.sink(print)
222
```
223
224
## KeyDataStream
225
226
Specialized stream for keyed operations that maintain state per key.
227
228
### Core API
229
230
```python { .api }
231
from ray.streaming.datastream import KeyDataStream
232
233
class KeyDataStream(DataStream):
234
def reduce(self, func) -> DataStream
235
def as_java_stream(self) -> JavaKeyDataStream
236
```
237
238
### Stateful Operations
239
240
```python { .api }
241
def reduce(self, func) -> DataStream:
242
"""
243
Apply reduce function to elements with the same key.
244
245
Args:
246
func: Function or ReduceFunction for combining values
247
248
Returns:
249
DataStream with reduced values per key
250
"""
251
```
252
253
### Usage Examples
254
255
```python
256
# Word count with reduce
257
ctx.read_text_file("document.txt") \
258
.flat_map(lambda line: line.split()) \
259
.map(lambda word: (word.lower(), 1)) \
260
.key_by(lambda pair: pair[0]) \
261
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
262
.sink(lambda result: print(f"{result[0]}: {result[1]}"))
263
264
# Sum by category
265
data = [("fruits", 10), ("vegetables", 5), ("fruits", 7), ("vegetables", 3)]
266
ctx.from_collection(data) \
267
.key_by(lambda item: item[0]) \
268
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
269
.sink(lambda result: print(f"Total {result[0]}: {result[1]}"))
270
```
271
272
## StreamSource
273
274
Entry point streams created from data sources.
275
276
### Core API
277
278
```python { .api }
279
from ray.streaming.datastream import StreamSource
280
281
class StreamSource(DataStream):
282
@staticmethod
283
def build_source(streaming_context, func) -> StreamSource
284
```
285
286
StreamSource inherits all DataStream transformation methods and serves as the starting point for stream processing pipelines.
287
288
### Usage Examples
289
290
```python
291
from ray.streaming.function import SourceFunction
292
293
class NumberSource(SourceFunction):
294
def init(self, parallel_id, num_parallel):
295
self.current = parallel_id
296
self.step = num_parallel
297
self.max_num = 100
298
299
def fetch(self, collector):
300
while self.current < self.max_num:
301
collector.collect(self.current)
302
self.current += self.step
303
304
# Create stream from custom source
305
source_stream = StreamSource.build_source(ctx, NumberSource())
306
source_stream.map(lambda x: x * 2) \
307
.filter(lambda x: x > 10) \
308
.sink(print)
309
```
310
311
## Cross-Language Integration
312
313
Ray Streaming supports mixed Python/Java operations through stream conversion.
314
315
### JavaDataStream
316
317
Java-based stream for cross-language operations.
318
319
```python { .api }
320
from ray.streaming.datastream import JavaDataStream
321
322
class JavaDataStream(Stream):
323
# Java operator methods (require Java class names)
324
def map(self, java_func_class) -> JavaDataStream
325
def flat_map(self, java_func_class) -> JavaDataStream
326
def filter(self, java_func_class) -> JavaDataStream
327
def union(self, *streams) -> JavaUnionStream
328
def key_by(self, java_func_class) -> JavaKeyDataStream
329
def sink(self, java_func_class) -> JavaStreamSink
330
331
# Convert back to Python stream
332
def as_python_stream(self) -> DataStream
333
```
334
335
### Usage Examples
336
337
```python
338
# Mixed Python/Java processing
339
python_stream = ctx.from_values("hello", "world", "ray") \
340
.map(lambda x: x.upper())
341
342
# Convert to Java for Java operators
343
java_stream = python_stream.as_java_stream() \
344
.map("com.example.JavaMapperFunction") \
345
.filter("com.example.JavaFilterFunction")
346
347
# Convert back to Python
348
result_stream = java_stream.as_python_stream() \
349
.map(lambda x: f"Processed: {x}") \
350
.sink(print)
351
```
352
353
## Stream Configuration
354
355
Configure stream behavior using the configuration system.
356
357
### Stream-Level Configuration
358
359
```python
360
# Configure individual streams
361
stream = ctx.from_collection(range(100)) \
362
.set_parallelism(4) \
363
.with_config("streaming.buffer.size", "1024") \
364
.with_config("streaming.timeout", "5000")
365
366
# Multiple configuration options
367
config = {
368
"streaming.checkpoint.interval": "10000",
369
"streaming.queue.capacity": "500"
370
}
371
configured_stream = stream.with_config(conf=config)
372
```
373
374
### Performance Optimization
375
376
```python
377
# Optimize for throughput
378
high_throughput_stream = ctx.from_collection(large_dataset) \
379
.set_parallelism(8) \
380
.with_config("streaming.buffer.size", "4096") \
381
.disable_chain() # Prevent operator chaining
382
383
# Optimize for latency
384
low_latency_stream = ctx.from_collection(data) \
385
.set_parallelism(1) \
386
.forward() # Local forwarding
387
.with_config("streaming.timeout", "100")
388
```
389
390
## Advanced Patterns
391
392
### Pipeline Branching
393
394
```python
395
# Split stream into multiple processing branches
396
source = ctx.from_collection(range(100))
397
398
# Branch 1: Even numbers
399
evens = source.filter(lambda x: x % 2 == 0) \
400
.map(lambda x: f"Even: {x}")
401
402
# Branch 2: Odd numbers
403
odds = source.filter(lambda x: x % 2 == 1) \
404
.map(lambda x: f"Odd: {x}")
405
406
# Merge branches
407
evens.union(odds).sink(print)
408
```
409
410
### Stateful Processing
411
412
```python
413
# Maintain running totals per key
414
ctx.from_values(("A", 10), ("B", 5), ("A", 15), ("B", 20)) \
415
.key_by(lambda pair: pair[0]) \
416
.reduce(lambda running_total, new_value:
417
(running_total[0], running_total[1] + new_value[1])) \
418
.sink(lambda result: print(f"Running total for {result[0]}: {result[1]}"))
419
```
420
421
## Error Handling
422
423
Handle errors in stream processing operations.
424
425
```python
426
def safe_transform(x):
427
try:
428
return int(x) * 2
429
except ValueError:
430
return 0 # Default value for invalid input
431
432
ctx.from_values("1", "2", "invalid", "4") \
433
.map(safe_transform) \
434
.filter(lambda x: x > 0) \
435
.sink(lambda x: print(f"Valid result: {x}"))
436
```
437
438
## See Also
439
440
- [Streaming Context Documentation](./streaming-context.md) - StreamingContext and job management
441
- [Source Functions Documentation](./source-functions.md) - Custom data source implementation
442
- [Stream Operations Documentation](./stream-operations.md) - Detailed transformation operations
443
- [Cross-Language Support Documentation](./cross-language.md) - Python/Java integration details