0
# Streaming Context and Job Management
1
2
This document covers the StreamingContext class, which serves as the main entry point for Ray streaming functionality and provides job lifecycle management.
3
4
## Overview
5
6
The StreamingContext is the primary interface for creating and managing streaming jobs in Ray Streaming. It provides methods for:
7
- Creating data streams from various sources
8
- Configuring streaming job parameters
9
- Submitting jobs for execution
10
- Managing the streaming job lifecycle
11
12
## StreamingContext
13
14
The main class for Ray streaming operations, acting as a wrapper around the Java StreamingContext implementation.
15
16
### Core API
17
18
```python { .api }
19
from ray.streaming import StreamingContext
20
from ray.streaming.function import SourceFunction
21
from ray.streaming.datastream import StreamSource
22
23
# Main StreamingContext class
24
class StreamingContext:
25
def __init__(self)
26
27
# Create stream from custom source function
28
def source(self, source_func: SourceFunction) -> StreamSource
29
30
# Create stream from multiple values
31
def from_values(self, *values) -> StreamSource
32
33
# Create stream from collection/list
34
def from_collection(self, values) -> StreamSource
35
36
# Create stream from text file (line by line)
37
def read_text_file(self, filename: str) -> StreamSource
38
39
# Submit job for execution
40
def submit(self, job_name: str) -> None
41
```
42
43
### StreamingContext.Builder
44
45
Configuration builder for creating StreamingContext instances with custom settings.
46
47
```python { .api }
48
class StreamingContext.Builder:
49
def __init__(self)
50
51
# Set configuration option(s)
52
def option(self, key=None, value=None, conf=None) -> Builder
53
54
# Create configured StreamingContext
55
def build(self) -> StreamingContext
56
```
57
58
## Capabilities
59
60
### Context Creation and Configuration
61
62
Create and configure streaming contexts with various options.
63
64
```python { .api }
65
from ray.streaming import StreamingContext
66
67
# Simple context creation
68
ctx = StreamingContext()
69
70
# Context with configuration
71
ctx = StreamingContext.Builder() \
72
.option("streaming.worker-num", "4") \
73
.option("streaming.context.backend.type", "MEMORY") \
74
.build()
75
76
# Multiple configuration options
77
config = {
78
"streaming.checkpoint.interval": "5000",
79
"streaming.queue.capacity": "1000"
80
}
81
ctx = StreamingContext.Builder() \
82
.option(conf=config) \
83
.build()
84
```
85
86
### Data Source Creation
87
88
Create data streams from various input sources.
89
90
```python { .api }
91
# Create stream from values
92
stream = ctx.from_values("hello", "world", "ray", "streaming")
93
94
# Create stream from collection
95
data = [1, 2, 3, 4, 5]
96
stream = ctx.from_collection(data)
97
98
# Create stream from text file
99
stream = ctx.read_text_file("input.txt")
100
101
# Create stream from custom source function
102
from ray.streaming.function import SourceFunction
103
104
class MySource(SourceFunction):
105
def fetch(self, collector):
106
for i in range(100):
107
collector.collect(f"item-{i}")
108
109
stream = ctx.source(MySource())
110
```
111
112
### Job Submission and Execution
113
114
Submit streaming jobs for execution on the Ray cluster.
115
116
```python { .api }
117
# Build streaming pipeline
118
ctx.from_values(1, 2, 3, 4, 5) \
119
.map(lambda x: x * 2) \
120
.filter(lambda x: x > 4) \
121
.sink(lambda x: print(f"Result: {x}"))
122
123
# Submit job with descriptive name
124
ctx.submit("data_processing_job")
125
```
126
127
## Usage Examples
128
129
### Basic Streaming Job
130
131
```python
132
import ray
133
from ray.streaming import StreamingContext
134
135
# Initialize Ray
136
ray.init()
137
138
# Create streaming context
139
ctx = StreamingContext.Builder().build()
140
141
# Create and process stream
142
ctx.from_collection([1, 2, 3, 4, 5]) \
143
.map(lambda x: x ** 2) \
144
.filter(lambda x: x > 10) \
145
.sink(lambda x: print(f"Large square: {x}"))
146
147
# Submit job
148
ctx.submit("squares_job")
149
```
150
151
### Word Count Example
152
153
```python
154
import ray
155
from ray.streaming import StreamingContext
156
157
ray.init()
158
ctx = StreamingContext.Builder() \
159
.option("streaming.worker-num", "2") \
160
.build()
161
162
# Process text file
163
ctx.read_text_file("document.txt") \
164
.flat_map(lambda line: line.split()) \
165
.map(lambda word: (word.lower(), 1)) \
166
.key_by(lambda pair: pair[0]) \
167
.reduce(lambda old, new: (old[0], old[1] + new[1])) \
168
.sink(lambda result: print(f"Word: {result[0]}, Count: {result[1]}"))
169
170
ctx.submit("word_count")
171
```
172
173
### Custom Source Function
174
175
```python
176
import ray
177
from ray.streaming import StreamingContext
178
from ray.streaming.function import SourceFunction
179
import time
180
181
class TimestampSource(SourceFunction):
182
def init(self, parallel_id, num_parallel):
183
self.count = 0
184
self.max_count = 10
185
186
def fetch(self, collector):
187
while self.count < self.max_count:
188
timestamp = int(time.time())
189
collector.collect(f"timestamp-{timestamp}-{self.count}")
190
self.count += 1
191
time.sleep(1)
192
193
ray.init()
194
ctx = StreamingContext.Builder().build()
195
196
ctx.source(TimestampSource()) \
197
.map(lambda x: x.upper()) \
198
.sink(lambda x: print(f"Processed: {x}"))
199
200
ctx.submit("timestamp_job")
201
```
202
203
### Configuration Options
204
205
Ray Streaming supports various configuration options for job tuning:
206
207
```python
208
# Performance tuning
209
ctx = StreamingContext.Builder() \
210
.option("streaming.worker-num", "8") \
211
.option("streaming.queue.capacity", "2000") \
212
.option("streaming.checkpoint.interval", "10000") \
213
.build()
214
215
# Backend configuration
216
ctx = StreamingContext.Builder() \
217
.option("streaming.context.backend.type", "LOCAL_FILE") \
218
.option("streaming.context.backend.path", "/tmp/streaming") \
219
.build()
220
221
# Multiple options via dictionary
222
config = {
223
"streaming.worker-num": "4",
224
"streaming.context.backend.type": "MEMORY",
225
"streaming.checkpoint.interval": "5000",
226
"streaming.queue.capacity": "1000"
227
}
228
ctx = StreamingContext.Builder() \
229
.option(conf=config) \
230
.build()
231
```
232
233
## Integration with Ray
234
235
StreamingContext integrates seamlessly with Ray's distributed computing capabilities:
236
237
- **Ray Actors**: Streaming workers run as Ray actors for distributed processing
238
- **Ray Dashboard**: Job monitoring and metrics available through Ray Dashboard
239
- **Resource Management**: Leverages Ray's resource allocation and scheduling
240
- **Fault Tolerance**: Built on Ray's actor supervision and recovery mechanisms
241
242
## Advanced Usage
243
244
### Error Handling
245
246
```python
247
try:
248
ctx.from_collection([1, 2, 3]) \
249
.map(lambda x: x / 0) # This will cause division by zero
250
.sink(print)
251
ctx.submit("error_job")
252
except Exception as e:
253
print(f"Job failed: {e}")
254
```
255
256
### Resource Configuration
257
258
```python
259
# Configure resources for streaming job
260
ctx = StreamingContext.Builder() \
261
.option("streaming.worker-num", "4") \
262
.option("streaming.worker-cpu", "2") \
263
.option("streaming.worker-memory", "2GB") \
264
.build()
265
```
266
267
## Best Practices
268
269
1. **Resource Planning**: Configure worker count based on data volume and processing complexity
270
2. **Checkpoint Intervals**: Set appropriate checkpoint intervals for fault tolerance vs. performance
271
3. **Parallelism**: Use `set_parallelism()` on streams to control processing parallelism
272
4. **Error Handling**: Implement proper error handling in user functions
273
5. **Resource Cleanup**: Ensure Ray cluster is properly shut down after job completion
274
275
## See Also
276
277
- [Data Streams Documentation](./data-streams.md) - Stream transformation operations
278
- [Source Functions Documentation](./source-functions.md) - Custom data source implementation
279
- [Stream Operations Documentation](./stream-operations.md) - Available stream transformations