or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cross-language.mddata-streams.mdindex.mdsource-functions.mdstream-operations.mdstreaming-context.md

streaming-context.mddocs/

0

# Streaming Context and Job Management

1

2

This document covers the StreamingContext class, which serves as the main entry point for Ray streaming functionality and provides job lifecycle management.

3

4

## Overview

5

6

The StreamingContext is the primary interface for creating and managing streaming jobs in Ray Streaming. It provides methods for:

7

- Creating data streams from various sources

8

- Configuring streaming job parameters

9

- Submitting jobs for execution

10

- Managing the streaming job lifecycle

11

12

## StreamingContext

13

14

The main class for Ray streaming operations, acting as a wrapper around the Java StreamingContext implementation.

15

16

### Core API

17

18

```python { .api }

19

from ray.streaming import StreamingContext

20

from ray.streaming.function import SourceFunction

21

from ray.streaming.datastream import StreamSource

22

23

# Main StreamingContext class

24

class StreamingContext:

25

def __init__(self)

26

27

# Create stream from custom source function

28

def source(self, source_func: SourceFunction) -> StreamSource

29

30

# Create stream from multiple values

31

def from_values(self, *values) -> StreamSource

32

33

# Create stream from collection/list

34

def from_collection(self, values) -> StreamSource

35

36

# Create stream from text file (line by line)

37

def read_text_file(self, filename: str) -> StreamSource

38

39

# Submit job for execution

40

def submit(self, job_name: str) -> None

41

```

42

43

### StreamingContext.Builder

44

45

Configuration builder for creating StreamingContext instances with custom settings.

46

47

```python { .api }

48

class StreamingContext.Builder:

49

def __init__(self)

50

51

# Set configuration option(s)

52

def option(self, key=None, value=None, conf=None) -> Builder

53

54

# Create configured StreamingContext

55

def build(self) -> StreamingContext

56

```

57

58

## Capabilities

59

60

### Context Creation and Configuration

61

62

Create and configure streaming contexts with various options.

63

64

```python { .api }

65

from ray.streaming import StreamingContext

66

67

# Simple context creation

68

ctx = StreamingContext()

69

70

# Context with configuration

71

ctx = StreamingContext.Builder() \

72

.option("streaming.worker-num", "4") \

73

.option("streaming.context.backend.type", "MEMORY") \

74

.build()

75

76

# Multiple configuration options

77

config = {

78

"streaming.checkpoint.interval": "5000",

79

"streaming.queue.capacity": "1000"

80

}

81

ctx = StreamingContext.Builder() \

82

.option(conf=config) \

83

.build()

84

```

85

86

### Data Source Creation

87

88

Create data streams from various input sources.

89

90

```python { .api }

91

# Create stream from values

92

stream = ctx.from_values("hello", "world", "ray", "streaming")

93

94

# Create stream from collection

95

data = [1, 2, 3, 4, 5]

96

stream = ctx.from_collection(data)

97

98

# Create stream from text file

99

stream = ctx.read_text_file("input.txt")

100

101

# Create stream from custom source function

102

from ray.streaming.function import SourceFunction

103

104

class MySource(SourceFunction):

105

def fetch(self, collector):

106

for i in range(100):

107

collector.collect(f"item-{i}")

108

109

stream = ctx.source(MySource())

110

```

111

112

### Job Submission and Execution

113

114

Submit streaming jobs for execution on the Ray cluster.

115

116

```python { .api }

117

# Build streaming pipeline

118

ctx.from_values(1, 2, 3, 4, 5) \

119

.map(lambda x: x * 2) \

120

.filter(lambda x: x > 4) \

121

.sink(lambda x: print(f"Result: {x}"))

122

123

# Submit job with descriptive name

124

ctx.submit("data_processing_job")

125

```

126

127

## Usage Examples

128

129

### Basic Streaming Job

130

131

```python

132

import ray

133

from ray.streaming import StreamingContext

134

135

# Initialize Ray

136

ray.init()

137

138

# Create streaming context

139

ctx = StreamingContext.Builder().build()

140

141

# Create and process stream

142

ctx.from_collection([1, 2, 3, 4, 5]) \

143

.map(lambda x: x ** 2) \

144

.filter(lambda x: x > 10) \

145

.sink(lambda x: print(f"Large square: {x}"))

146

147

# Submit job

148

ctx.submit("squares_job")

149

```

150

151

### Word Count Example

152

153

```python

154

import ray

155

from ray.streaming import StreamingContext

156

157

ray.init()

158

ctx = StreamingContext.Builder() \

159

.option("streaming.worker-num", "2") \

160

.build()

161

162

# Process text file

163

ctx.read_text_file("document.txt") \

164

.flat_map(lambda line: line.split()) \

165

.map(lambda word: (word.lower(), 1)) \

166

.key_by(lambda pair: pair[0]) \

167

.reduce(lambda old, new: (old[0], old[1] + new[1])) \

168

.sink(lambda result: print(f"Word: {result[0]}, Count: {result[1]}"))

169

170

ctx.submit("word_count")

171

```

172

173

### Custom Source Function

174

175

```python

176

import ray

177

from ray.streaming import StreamingContext

178

from ray.streaming.function import SourceFunction

179

import time

180

181

class TimestampSource(SourceFunction):

182

def init(self, parallel_id, num_parallel):

183

self.count = 0

184

self.max_count = 10

185

186

def fetch(self, collector):

187

while self.count < self.max_count:

188

timestamp = int(time.time())

189

collector.collect(f"timestamp-{timestamp}-{self.count}")

190

self.count += 1

191

time.sleep(1)

192

193

ray.init()

194

ctx = StreamingContext.Builder().build()

195

196

ctx.source(TimestampSource()) \

197

.map(lambda x: x.upper()) \

198

.sink(lambda x: print(f"Processed: {x}"))

199

200

ctx.submit("timestamp_job")

201

```

202

203

### Configuration Options

204

205

Ray Streaming supports various configuration options for job tuning:

206

207

```python

208

# Performance tuning

209

ctx = StreamingContext.Builder() \

210

.option("streaming.worker-num", "8") \

211

.option("streaming.queue.capacity", "2000") \

212

.option("streaming.checkpoint.interval", "10000") \

213

.build()

214

215

# Backend configuration

216

ctx = StreamingContext.Builder() \

217

.option("streaming.context.backend.type", "LOCAL_FILE") \

218

.option("streaming.context.backend.path", "/tmp/streaming") \

219

.build()

220

221

# Multiple options via dictionary

222

config = {

223

"streaming.worker-num": "4",

224

"streaming.context.backend.type": "MEMORY",

225

"streaming.checkpoint.interval": "5000",

226

"streaming.queue.capacity": "1000"

227

}

228

ctx = StreamingContext.Builder() \

229

.option(conf=config) \

230

.build()

231

```

232

233

## Integration with Ray

234

235

StreamingContext integrates seamlessly with Ray's distributed computing capabilities:

236

237

- **Ray Actors**: Streaming workers run as Ray actors for distributed processing

238

- **Ray Dashboard**: Job monitoring and metrics available through Ray Dashboard

239

- **Resource Management**: Leverages Ray's resource allocation and scheduling

240

- **Fault Tolerance**: Built on Ray's actor supervision and recovery mechanisms

241

242

## Advanced Usage

243

244

### Error Handling

245

246

```python

247

try:

248

ctx.from_collection([1, 2, 3]) \

249

.map(lambda x: x / 0) # This will cause division by zero

250

.sink(print)

251

ctx.submit("error_job")

252

except Exception as e:

253

print(f"Job failed: {e}")

254

```

255

256

### Resource Configuration

257

258

```python

259

# Configure resources for streaming job

260

ctx = StreamingContext.Builder() \

261

.option("streaming.worker-num", "4") \

262

.option("streaming.worker-cpu", "2") \

263

.option("streaming.worker-memory", "2GB") \

264

.build()

265

```

266

267

## Best Practices

268

269

1. **Resource Planning**: Configure worker count based on data volume and processing complexity

270

2. **Checkpoint Intervals**: Set appropriate checkpoint intervals for fault tolerance vs. performance

271

3. **Parallelism**: Use `set_parallelism()` on streams to control processing parallelism

272

4. **Error Handling**: Implement proper error handling in user functions

273

5. **Resource Cleanup**: Ensure Ray cluster is properly shut down after job completion

274

275

## See Also

276

277

- [Data Streams Documentation](./data-streams.md) - Stream transformation operations

278

- [Source Functions Documentation](./source-functions.md) - Custom data source implementation

279

- [Stream Operations Documentation](./stream-operations.md) - Available stream transformations