or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cross-language.mddata-streams.mdindex.mdsource-functions.mdstream-operations.mdstreaming-context.md

data-streams.mddocs/

0

# Data Streams and Transformations

1

2

This document covers the data stream classes and transformation operations in Ray Streaming, including DataStream, KeyDataStream, and their Java counterparts for cross-language integration.

3

4

## Overview

5

6

Ray Streaming provides a fluent API for stream transformations through several stream classes:

7

- **DataStream**: Main stream class for Python-based transformations

8

- **KeyDataStream**: Keyed stream for stateful operations like reduce

9

- **StreamSource**: Entry point streams from data sources

10

- **UnionStream**: Result of union operations between multiple streams

11

- **JavaDataStream**: Java-based stream for cross-language operations

12

13

## Stream Base Class

14

15

All stream classes inherit from the abstract Stream base class.

16

17

### Core API

18

19

```python { .api }

20

from ray.streaming.datastream import Stream

21

22

class Stream:

23

def get_parallelism(self) -> int

24

def set_parallelism(self, parallelism: int) -> Stream

25

def get_id(self) -> str

26

def with_config(self, key=None, value=None, conf=None) -> Stream

27

def get_config(self) -> dict

28

def forward(self) -> Stream

29

def disable_chain(self) -> Stream

30

def get_input_stream(self) -> Stream

31

def get_streaming_context(self) -> StreamingContext

32

```

33

34

## DataStream

35

36

The main stream class for Python-based stream processing and transformations.

37

38

### Core API

39

40

```python { .api }

41

from ray.streaming.datastream import DataStream

42

43

class DataStream(Stream):

44

# Element-wise transformations

45

def map(self, func) -> DataStream

46

def flat_map(self, func) -> DataStream

47

def filter(self, func) -> DataStream

48

49

# Stream composition

50

def union(self, *streams) -> UnionStream

51

52

# Partitioning and keying

53

def key_by(self, func) -> KeyDataStream

54

def broadcast(self) -> DataStream

55

def partition_by(self, partition_func) -> DataStream

56

57

# Output operations

58

def sink(self, func) -> StreamSink

59

60

# Cross-language integration

61

def as_java_stream(self) -> JavaDataStream

62

```

63

64

## Capabilities

65

66

### Element-wise Transformations

67

68

Apply functions to individual elements in the stream.

69

70

```python { .api }

71

# Map transformation - one-to-one element transformation

72

def map(self, func) -> DataStream:

73

"""

74

Transform each element using the provided function.

75

76

Args:

77

func: Function or MapFunction instance for transformation

78

79

Returns:

80

New DataStream with transformed elements

81

"""

82

83

# FlatMap transformation - one-to-many element transformation

84

def flat_map(self, func) -> DataStream:

85

"""

86

Transform each element into zero or more output elements.

87

88

Args:

89

func: Function or FlatMapFunction that returns iterable

90

91

Returns:

92

New DataStream with flattened results

93

"""

94

95

# Filter transformation - element filtering

96

def filter(self, func) -> DataStream:

97

"""

98

Keep only elements that satisfy the predicate.

99

100

Args:

101

func: Function or FilterFunction returning boolean

102

103

Returns:

104

New DataStream with filtered elements

105

"""

106

```

107

108

### Usage Examples

109

110

```python

111

from ray.streaming import StreamingContext

112

113

ctx = StreamingContext.Builder().build()

114

115

# Map transformation

116

ctx.from_collection([1, 2, 3, 4, 5]) \

117

.map(lambda x: x * 2) \

118

.sink(lambda x: print(f"Doubled: {x}"))

119

120

# FlatMap transformation

121

ctx.from_values("hello world", "ray streaming") \

122

.flat_map(lambda line: line.split()) \

123

.map(lambda word: word.upper()) \

124

.sink(lambda x: print(f"Word: {x}"))

125

126

# Filter transformation

127

ctx.from_collection(range(10)) \

128

.filter(lambda x: x % 2 == 0) \

129

.map(lambda x: x ** 2) \

130

.sink(lambda x: print(f"Even square: {x}"))

131

```

132

133

### Stream Composition

134

135

Combine multiple streams into unified processing pipelines.

136

137

```python { .api }

138

def union(self, *streams) -> UnionStream:

139

"""

140

Merge multiple streams of the same type.

141

142

Args:

143

*streams: DataStreams to union with this stream

144

145

Returns:

146

UnionStream containing elements from all input streams

147

"""

148

```

149

150

### Usage Example

151

152

```python

153

# Union multiple streams

154

stream1 = ctx.from_values(1, 2, 3)

155

stream2 = ctx.from_values(4, 5, 6)

156

stream3 = ctx.from_values(7, 8, 9)

157

158

unified = stream1.union(stream2, stream3) \

159

.map(lambda x: x * 10) \

160

.sink(lambda x: print(f"Unified: {x}"))

161

```

162

163

### Partitioning and Keying

164

165

Control data distribution and enable stateful operations.

166

167

```python { .api }

168

def key_by(self, func) -> KeyDataStream:

169

"""

170

Partition stream by key for stateful operations.

171

172

Args:

173

func: Function or KeyFunction to extract key from elements

174

175

Returns:

176

KeyDataStream partitioned by the key function

177

"""

178

179

def broadcast(self) -> DataStream:

180

"""

181

Broadcast all elements to every parallel instance.

182

183

Returns:

184

DataStream with broadcast partitioning

185

"""

186

187

def partition_by(self, partition_func) -> DataStream:

188

"""

189

Partition stream using custom partitioning function.

190

191

Args:

192

partition_func: Function or Partition instance for custom partitioning

193

194

Returns:

195

DataStream with custom partitioning

196

"""

197

```

198

199

### Usage Examples

200

201

```python

202

# Key-based processing

203

ctx.from_values(("a", 1), ("b", 2), ("a", 3), ("b", 4)) \

204

.key_by(lambda pair: pair[0]) \

205

.reduce(lambda old, new: (old[0], old[1] + new[1])) \

206

.sink(lambda x: print(f"Sum for {x[0]}: {x[1]}"))

207

208

# Broadcast partitioning

209

ctx.from_values("broadcast", "message") \

210

.broadcast() \

211

.map(lambda x: x.upper()) \

212

.sink(lambda x: print(f"Broadcast: {x}"))

213

214

# Custom partitioning

215

def custom_partition(element):

216

return hash(element) % 4

217

218

ctx.from_collection(range(20)) \

219

.partition_by(custom_partition) \

220

.map(lambda x: f"Partition-{x}") \

221

.sink(print)

222

```

223

224

## KeyDataStream

225

226

Specialized stream for keyed operations that maintain state per key.

227

228

### Core API

229

230

```python { .api }

231

from ray.streaming.datastream import KeyDataStream

232

233

class KeyDataStream(DataStream):

234

def reduce(self, func) -> DataStream

235

def as_java_stream(self) -> JavaKeyDataStream

236

```

237

238

### Stateful Operations

239

240

```python { .api }

241

def reduce(self, func) -> DataStream:

242

"""

243

Apply reduce function to elements with the same key.

244

245

Args:

246

func: Function or ReduceFunction for combining values

247

248

Returns:

249

DataStream with reduced values per key

250

"""

251

```

252

253

### Usage Examples

254

255

```python

256

# Word count with reduce

257

ctx.read_text_file("document.txt") \

258

.flat_map(lambda line: line.split()) \

259

.map(lambda word: (word.lower(), 1)) \

260

.key_by(lambda pair: pair[0]) \

261

.reduce(lambda old, new: (old[0], old[1] + new[1])) \

262

.sink(lambda result: print(f"{result[0]}: {result[1]}"))

263

264

# Sum by category

265

data = [("fruits", 10), ("vegetables", 5), ("fruits", 7), ("vegetables", 3)]

266

ctx.from_collection(data) \

267

.key_by(lambda item: item[0]) \

268

.reduce(lambda old, new: (old[0], old[1] + new[1])) \

269

.sink(lambda result: print(f"Total {result[0]}: {result[1]}"))

270

```

271

272

## StreamSource

273

274

Entry point streams created from data sources.

275

276

### Core API

277

278

```python { .api }

279

from ray.streaming.datastream import StreamSource

280

281

class StreamSource(DataStream):

282

@staticmethod

283

def build_source(streaming_context, func) -> StreamSource

284

```

285

286

StreamSource inherits all DataStream transformation methods and serves as the starting point for stream processing pipelines.

287

288

### Usage Examples

289

290

```python

291

from ray.streaming.function import SourceFunction

292

293

class NumberSource(SourceFunction):

294

def init(self, parallel_id, num_parallel):

295

self.current = parallel_id

296

self.step = num_parallel

297

self.max_num = 100

298

299

def fetch(self, collector):

300

while self.current < self.max_num:

301

collector.collect(self.current)

302

self.current += self.step

303

304

# Create stream from custom source

305

source_stream = StreamSource.build_source(ctx, NumberSource())

306

source_stream.map(lambda x: x * 2) \

307

.filter(lambda x: x > 10) \

308

.sink(print)

309

```

310

311

## Cross-Language Integration

312

313

Ray Streaming supports mixed Python/Java operations through stream conversion.

314

315

### JavaDataStream

316

317

Java-based stream for cross-language operations.

318

319

```python { .api }

320

from ray.streaming.datastream import JavaDataStream

321

322

class JavaDataStream(Stream):

323

# Java operator methods (require Java class names)

324

def map(self, java_func_class) -> JavaDataStream

325

def flat_map(self, java_func_class) -> JavaDataStream

326

def filter(self, java_func_class) -> JavaDataStream

327

def union(self, *streams) -> JavaUnionStream

328

def key_by(self, java_func_class) -> JavaKeyDataStream

329

def sink(self, java_func_class) -> JavaStreamSink

330

331

# Convert back to Python stream

332

def as_python_stream(self) -> DataStream

333

```

334

335

### Usage Examples

336

337

```python

338

# Mixed Python/Java processing

339

python_stream = ctx.from_values("hello", "world", "ray") \

340

.map(lambda x: x.upper())

341

342

# Convert to Java for Java operators

343

java_stream = python_stream.as_java_stream() \

344

.map("com.example.JavaMapperFunction") \

345

.filter("com.example.JavaFilterFunction")

346

347

# Convert back to Python

348

result_stream = java_stream.as_python_stream() \

349

.map(lambda x: f"Processed: {x}") \

350

.sink(print)

351

```

352

353

## Stream Configuration

354

355

Configure stream behavior using the configuration system.

356

357

### Stream-Level Configuration

358

359

```python

360

# Configure individual streams

361

stream = ctx.from_collection(range(100)) \

362

.set_parallelism(4) \

363

.with_config("streaming.buffer.size", "1024") \

364

.with_config("streaming.timeout", "5000")

365

366

# Multiple configuration options

367

config = {

368

"streaming.checkpoint.interval": "10000",

369

"streaming.queue.capacity": "500"

370

}

371

configured_stream = stream.with_config(conf=config)

372

```

373

374

### Performance Optimization

375

376

```python

377

# Optimize for throughput

378

high_throughput_stream = ctx.from_collection(large_dataset) \

379

.set_parallelism(8) \

380

.with_config("streaming.buffer.size", "4096") \

381

.disable_chain() # Prevent operator chaining

382

383

# Optimize for latency

384

low_latency_stream = ctx.from_collection(data) \

385

.set_parallelism(1) \

386

.forward() # Local forwarding

387

.with_config("streaming.timeout", "100")

388

```

389

390

## Advanced Patterns

391

392

### Pipeline Branching

393

394

```python

395

# Split stream into multiple processing branches

396

source = ctx.from_collection(range(100))

397

398

# Branch 1: Even numbers

399

evens = source.filter(lambda x: x % 2 == 0) \

400

.map(lambda x: f"Even: {x}")

401

402

# Branch 2: Odd numbers

403

odds = source.filter(lambda x: x % 2 == 1) \

404

.map(lambda x: f"Odd: {x}")

405

406

# Merge branches

407

evens.union(odds).sink(print)

408

```

409

410

### Stateful Processing

411

412

```python

413

# Maintain running totals per key

414

ctx.from_values(("A", 10), ("B", 5), ("A", 15), ("B", 20)) \

415

.key_by(lambda pair: pair[0]) \

416

.reduce(lambda running_total, new_value:

417

(running_total[0], running_total[1] + new_value[1])) \

418

.sink(lambda result: print(f"Running total for {result[0]}: {result[1]}"))

419

```

420

421

## Error Handling

422

423

Handle errors in stream processing operations.

424

425

```python

426

def safe_transform(x):

427

try:

428

return int(x) * 2

429

except ValueError:

430

return 0 # Default value for invalid input

431

432

ctx.from_values("1", "2", "invalid", "4") \

433

.map(safe_transform) \

434

.filter(lambda x: x > 0) \

435

.sink(lambda x: print(f"Valid result: {x}"))

436

```

437

438

## See Also

439

440

- [Streaming Context Documentation](./streaming-context.md) - StreamingContext and job management

441

- [Source Functions Documentation](./source-functions.md) - Custom data source implementation

442

- [Stream Operations Documentation](./stream-operations.md) - Detailed transformation operations

443

- [Cross-Language Support Documentation](./cross-language.md) - Python/Java integration details