or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cross-language.mddata-streams.mdindex.mdsource-functions.mdstream-operations.mdstreaming-context.md

index.mddocs/

0

# Ray Streaming

1

2

A distributed streaming processing framework built on Ray that provides fault-tolerant, scalable stream processing with Python API and cross-language support.

3

4

## Package Information

5

6

- **Package Name**: ray (streaming module)

7

- **Package Type**: PyPI

8

- **Language**: Python

9

- **Installation**: `pip install ray`

10

11

## Overview

12

13

Ray Streaming provides a distributed stream processing framework built on Ray's actor model. It enables fault-tolerant stream processing with automatic checkpointing, resource management, and cross-language integration between Python and Java workloads.

14

15

**Key Features:**

16

- Distributed stream processing with fault tolerance

17

- Single-node failover mechanism for fast recovery

18

- Cross-language support (Python and Java operators)

19

- Built-in checkpointing and state management

20

- Integration with Ray's distributed computing capabilities

21

22

## Core Imports

23

24

```python

25

from ray.streaming import StreamingContext

26

from ray.streaming.datastream import DataStream, StreamSource

27

from ray.streaming.function import SourceFunction, CollectionSourceFunction

28

```

29

30

## Basic Usage

31

32

### Simple Word Count Example

33

34

```python

35

import ray

36

from ray.streaming import StreamingContext

37

38

# Initialize Ray and create streaming context

39

ray.init()

40

ctx = StreamingContext.Builder().build()

41

42

# Create a simple streaming pipeline

43

ctx.read_text_file("input.txt") \

44

.set_parallelism(1) \

45

.flat_map(lambda x: x.split()) \

46

.map(lambda x: (x, 1)) \

47

.key_by(lambda x: x[0]) \

48

.reduce(lambda old_value, new_value:

49

(old_value[0], old_value[1] + new_value[1])) \

50

.filter(lambda x: "ray" not in x) \

51

.sink(lambda x: print("result", x))

52

53

# Submit the job

54

ctx.submit("word_count_job")

55

```

56

57

### Creating Data Streams from Collections

58

59

```python

60

from ray.streaming import StreamingContext

61

62

ctx = StreamingContext.Builder().build()

63

64

# Create stream from values

65

ctx.from_values("a", "b", "c") \

66

.map(lambda x: x.upper()) \

67

.sink(lambda x: print(x))

68

69

# Create stream from collection

70

data = [1, 2, 3, 4, 5]

71

ctx.from_collection(data) \

72

.filter(lambda x: x % 2 == 0) \

73

.map(lambda x: x * 2) \

74

.sink(lambda x: print(f"Even doubled: {x}"))

75

76

ctx.submit("collection_processing")

77

```

78

79

## Architecture

80

81

Ray Streaming implements a master-worker architecture using Ray actors:

82

83

### Core Components

84

- **StreamingContext**: Main entry point for creating and configuring streaming jobs

85

- **DataStream**: Represents a stream of data elements that can be transformed

86

- **StreamSource**: Entry points for data ingestion into the streaming pipeline

87

- **Operators**: Transformation functions (map, filter, reduce, etc.) applied to streams

88

- **Internal Runtime**: Java-based execution engine handling distributed processing, fault tolerance, and resource management

89

90

### Fault Tolerance

91

Ray Streaming provides automatic fault tolerance through:

92

- Periodic checkpointing of operator state

93

- Single-node failover that only restarts failed components

94

- Automatic recovery from the last successful checkpoint

95

- Message replay from upstream operators during recovery

96

97

## Capabilities

98

99

### Streaming Context and Job Management

100

101

Main entry point for Ray streaming functionality with job lifecycle management.

102

103

```python { .api }

104

class StreamingContext:

105

class Builder:

106

def option(self, key=None, value=None, conf=None) -> Builder

107

def build(self) -> StreamingContext

108

109

def source(self, source_func: SourceFunction) -> StreamSource

110

def from_values(self, *values) -> StreamSource

111

def from_collection(self, values) -> StreamSource

112

def read_text_file(self, filename: str) -> StreamSource

113

def submit(self, job_name: str) -> None

114

```

115

116

[→ Streaming Context Documentation](./streaming-context.md)

117

118

### Data Streams and Transformations

119

120

Stream transformation operations and data flow management.

121

122

```python { .api }

123

class DataStream:

124

def map(self, func) -> DataStream

125

def flat_map(self, func) -> DataStream

126

def filter(self, func) -> DataStream

127

def key_by(self, func) -> KeyDataStream

128

def union(self, *others) -> DataStream

129

def sink(self, func) -> DataStreamSink

130

def set_parallelism(self, parallelism: int) -> DataStream

131

```

132

133

[→ Data Streams Documentation](./data-streams.md)

134

135

### Source Functions and Data Ingestion

136

137

Data source implementations and custom source function interfaces.

138

139

```python { .api }

140

class SourceFunction:

141

def init(self, parallel_id: int, num_parallel: int) -> None

142

def fetch(self, collector) -> None

143

144

class CollectionSourceFunction(SourceFunction):

145

def __init__(self, values)

146

147

class LocalFileSourceFunction(SourceFunction):

148

def __init__(self, filename: str)

149

```

150

151

[→ Source Functions Documentation](./source-functions.md)

152

153

### Stream Processing Operations

154

155

Core stream transformation operators and windowing functions.

156

157

```python { .api }

158

# Transformation Operations

159

DataStream.map(func) # One-to-one transformation

160

DataStream.flat_map(func) # One-to-many transformation

161

DataStream.filter(func) # Element filtering

162

DataStream.union(*others) # Stream union

163

164

# Keyed Operations

165

KeyDataStream.reduce(func) # Stateful reduction

166

KeyDataStream.window(window_func) # Windowing operations

167

```

168

169

[→ Stream Operations Documentation](./stream-operations.md)

170

171

### Cross-Language Integration

172

173

Support for mixed Python/Java streaming applications.

174

175

```python { .api }

176

# Convert to Java stream for Java operators

177

DataStream.as_java_stream()

178

179

# Use Java operators with class names

180

java_stream.map("com.example.MyMapper")

181

java_stream.filter("com.example.MyFilter")

182

183

# Convert back to Python stream

184

java_stream.as_python_stream()

185

```

186

187

[→ Cross-Language Support Documentation](./cross-language.md)

188

189

## Advanced Features

190

191

### Custom Source Functions

192

193

Create custom data sources by implementing the SourceFunction interface:

194

195

```python

196

from ray.streaming.function import SourceFunction

197

198

class CustomSourceFunction(SourceFunction):

199

def init(self, parallel_id, num_parallel):

200

self.counter = 0

201

self.max_count = 1000

202

203

def fetch(self, collector):

204

if self.counter < self.max_count:

205

collector.collect(f"message-{self.counter}")

206

self.counter += 1

207

else:

208

# Signal end of stream

209

collector.close()

210

```

211

212

### Configuration Options

213

214

Configure streaming jobs with various options:

215

216

```python

217

ctx = StreamingContext.Builder() \

218

.option("streaming.worker-num", "4") \

219

.option("streaming.context.backend.type", "MEMORY") \

220

.option("streaming.checkpoint.interval", "5000") \

221

.build()

222

```

223

224

### Error Handling and Monitoring

225

226

Ray Streaming integrates with Ray's monitoring and error handling:

227

- Built-in metrics collection and reporting

228

- Integration with Ray Dashboard for job monitoring

229

- Automatic error recovery and notification

230

- Configurable retry policies and failure handling

231

232

## Getting Started

233

234

1. **Install Ray**: `pip install ray`

235

2. **Initialize Ray**: `ray.init()`

236

3. **Create Context**: `ctx = StreamingContext.Builder().build()`

237

4. **Build Pipeline**: Chain stream operations using fluent API

238

5. **Submit Job**: `ctx.submit("job_name")`

239

240

For detailed examples and API references, see the individual capability documentation pages linked above.