Ray Streaming - a distributed stream processing framework built on Ray that provides fault-tolerant stream processing with Python API
npx @tessl/cli install tessl/pypi-ray-streaming@1.10.00
# Ray Streaming
1
2
A distributed streaming processing framework built on Ray that provides fault-tolerant, scalable stream processing with Python API and cross-language support.
3
4
## Package Information
5
6
- **Package Name**: ray (streaming module)
7
- **Package Type**: PyPI
8
- **Language**: Python
9
- **Installation**: `pip install ray`
10
11
## Overview
12
13
Ray Streaming provides a distributed stream processing framework built on Ray's actor model. It enables fault-tolerant stream processing with automatic checkpointing, resource management, and cross-language integration between Python and Java workloads.
14
15
**Key Features:**
16
- Distributed stream processing with fault tolerance
17
- Single-node failover mechanism for fast recovery
18
- Cross-language support (Python and Java operators)
19
- Built-in checkpointing and state management
20
- Integration with Ray's distributed computing capabilities
21
22
## Core Imports
23
24
```python
25
from ray.streaming import StreamingContext
26
from ray.streaming.datastream import DataStream, StreamSource
27
from ray.streaming.function import SourceFunction, CollectionSourceFunction
28
```
29
30
## Basic Usage
31
32
### Simple Word Count Example
33
34
```python
35
import ray
36
from ray.streaming import StreamingContext
37
38
# Initialize Ray and create streaming context
39
ray.init()
40
ctx = StreamingContext.Builder().build()
41
42
# Create a simple streaming pipeline
43
ctx.read_text_file("input.txt") \
44
.set_parallelism(1) \
45
.flat_map(lambda x: x.split()) \
46
.map(lambda x: (x, 1)) \
47
.key_by(lambda x: x[0]) \
48
.reduce(lambda old_value, new_value:
49
(old_value[0], old_value[1] + new_value[1])) \
50
.filter(lambda x: "ray" not in x) \
51
.sink(lambda x: print("result", x))
52
53
# Submit the job
54
ctx.submit("word_count_job")
55
```
56
57
### Creating Data Streams from Collections
58
59
```python
60
from ray.streaming import StreamingContext
61
62
ctx = StreamingContext.Builder().build()
63
64
# Create stream from values
65
ctx.from_values("a", "b", "c") \
66
.map(lambda x: x.upper()) \
67
.sink(lambda x: print(x))
68
69
# Create stream from collection
70
data = [1, 2, 3, 4, 5]
71
ctx.from_collection(data) \
72
.filter(lambda x: x % 2 == 0) \
73
.map(lambda x: x * 2) \
74
.sink(lambda x: print(f"Even doubled: {x}"))
75
76
ctx.submit("collection_processing")
77
```
78
79
## Architecture
80
81
Ray Streaming implements a master-worker architecture using Ray actors:
82
83
### Core Components
84
- **StreamingContext**: Main entry point for creating and configuring streaming jobs
85
- **DataStream**: Represents a stream of data elements that can be transformed
86
- **StreamSource**: Entry points for data ingestion into the streaming pipeline
87
- **Operators**: Transformation functions (map, filter, reduce, etc.) applied to streams
88
- **Internal Runtime**: Java-based execution engine handling distributed processing, fault tolerance, and resource management
89
90
### Fault Tolerance
91
Ray Streaming provides automatic fault tolerance through:
92
- Periodic checkpointing of operator state
93
- Single-node failover that only restarts failed components
94
- Automatic recovery from the last successful checkpoint
95
- Message replay from upstream operators during recovery
96
97
## Capabilities
98
99
### Streaming Context and Job Management
100
101
Main entry point for Ray streaming functionality with job lifecycle management.
102
103
```python { .api }
104
class StreamingContext:
105
class Builder:
106
def option(self, key=None, value=None, conf=None) -> Builder
107
def build(self) -> StreamingContext
108
109
def source(self, source_func: SourceFunction) -> StreamSource
110
def from_values(self, *values) -> StreamSource
111
def from_collection(self, values) -> StreamSource
112
def read_text_file(self, filename: str) -> StreamSource
113
def submit(self, job_name: str) -> None
114
```
115
116
[→ Streaming Context Documentation](./streaming-context.md)
117
118
### Data Streams and Transformations
119
120
Stream transformation operations and data flow management.
121
122
```python { .api }
123
class DataStream:
124
def map(self, func) -> DataStream
125
def flat_map(self, func) -> DataStream
126
def filter(self, func) -> DataStream
127
def key_by(self, func) -> KeyDataStream
128
def union(self, *others) -> DataStream
129
def sink(self, func) -> DataStreamSink
130
def set_parallelism(self, parallelism: int) -> DataStream
131
```
132
133
[→ Data Streams Documentation](./data-streams.md)
134
135
### Source Functions and Data Ingestion
136
137
Data source implementations and custom source function interfaces.
138
139
```python { .api }
140
class SourceFunction:
141
def init(self, parallel_id: int, num_parallel: int) -> None
142
def fetch(self, collector) -> None
143
144
class CollectionSourceFunction(SourceFunction):
145
def __init__(self, values)
146
147
class LocalFileSourceFunction(SourceFunction):
148
def __init__(self, filename: str)
149
```
150
151
[→ Source Functions Documentation](./source-functions.md)
152
153
### Stream Processing Operations
154
155
Core stream transformation operators and windowing functions.
156
157
```python { .api }
158
# Transformation Operations
159
DataStream.map(func) # One-to-one transformation
160
DataStream.flat_map(func) # One-to-many transformation
161
DataStream.filter(func) # Element filtering
162
DataStream.union(*others) # Stream union
163
164
# Keyed Operations
165
KeyDataStream.reduce(func) # Stateful reduction
166
KeyDataStream.window(window_func) # Windowing operations
167
```
168
169
[→ Stream Operations Documentation](./stream-operations.md)
170
171
### Cross-Language Integration
172
173
Support for mixed Python/Java streaming applications.
174
175
```python { .api }
176
# Convert to Java stream for Java operators
177
DataStream.as_java_stream()
178
179
# Use Java operators with class names
180
java_stream.map("com.example.MyMapper")
181
java_stream.filter("com.example.MyFilter")
182
183
# Convert back to Python stream
184
java_stream.as_python_stream()
185
```
186
187
[→ Cross-Language Support Documentation](./cross-language.md)
188
189
## Advanced Features
190
191
### Custom Source Functions
192
193
Create custom data sources by implementing the SourceFunction interface:
194
195
```python
196
from ray.streaming.function import SourceFunction
197
198
class CustomSourceFunction(SourceFunction):
199
def init(self, parallel_id, num_parallel):
200
self.counter = 0
201
self.max_count = 1000
202
203
def fetch(self, collector):
204
if self.counter < self.max_count:
205
collector.collect(f"message-{self.counter}")
206
self.counter += 1
207
else:
208
# Signal end of stream
209
collector.close()
210
```
211
212
### Configuration Options
213
214
Configure streaming jobs with various options:
215
216
```python
217
ctx = StreamingContext.Builder() \
218
.option("streaming.worker-num", "4") \
219
.option("streaming.context.backend.type", "MEMORY") \
220
.option("streaming.checkpoint.interval", "5000") \
221
.build()
222
```
223
224
### Error Handling and Monitoring
225
226
Ray Streaming integrates with Ray's monitoring and error handling:
227
- Built-in metrics collection and reporting
228
- Integration with Ray Dashboard for job monitoring
229
- Automatic error recovery and notification
230
- Configurable retry policies and failure handling
231
232
## Getting Started
233
234
1. **Install Ray**: `pip install ray`
235
2. **Initialize Ray**: `ray.init()`
236
3. **Create Context**: `ctx = StreamingContext.Builder().build()`
237
4. **Build Pipeline**: Chain stream operations using fluent API
238
5. **Submit Job**: `ctx.submit("job_name")`
239
240
For detailed examples and API references, see the individual capability documentation pages linked above.