or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cross-language.mddata-streams.mdindex.mdsource-functions.mdstream-operations.mdstreaming-context.md

stream-operations.mddocs/

0

# Stream Processing Operations

1

2

This document covers the detailed stream transformation operations available in Ray Streaming, including transformation functions, partitioning strategies, and advanced stream processing patterns.

3

4

## Overview

5

6

Ray Streaming provides a comprehensive set of stream transformation operations:

7

- **Element Transformations**: map, flat_map, filter

8

- **Keyed Operations**: key_by, reduce

9

- **Stream Composition**: union

10

- **Partitioning**: broadcast, partition_by, forward

11

- **Output Operations**: sink

12

- **Function Interfaces**: MapFunction, FilterFunction, ReduceFunction, etc.

13

14

## Function Interfaces

15

16

Ray Streaming defines function interfaces for type-safe and extensible stream processing.

17

18

### Core Function Interface

19

20

```python { .api }

21

from ray.streaming.function import Function

22

23

class Function:

24

def open(self, runtime_context) -> None

25

def close(self) -> None

26

def save_checkpoint(self) -> object

27

def load_checkpoint(self, checkpoint_obj) -> None

28

```

29

30

### Transformation Function Interfaces

31

32

```python { .api }

33

# Map function interface

34

class MapFunction(Function):

35

def map(self, value) -> object

36

37

# FlatMap function interface

38

class FlatMapFunction(Function):

39

def flat_map(self, value, collector) -> None

40

41

# Filter function interface

42

class FilterFunction(Function):

43

def filter(self, value) -> bool

44

45

# Key extraction function interface

46

class KeyFunction(Function):

47

def key_by(self, value) -> object

48

49

# Reduce function interface

50

class ReduceFunction(Function):

51

def reduce(self, old_value, new_value) -> object

52

53

# Sink function interface

54

class SinkFunction(Function):

55

def sink(self, value) -> None

56

```

57

58

## Element Transformations

59

60

### Map Operation

61

62

Transform each element in the stream using a one-to-one function.

63

64

```python { .api }

65

def map(self, func) -> DataStream:

66

"""

67

Apply a function to each element in the stream.

68

69

Args:

70

func: Function or MapFunction instance for transformation

71

72

Returns:

73

New DataStream with transformed elements

74

"""

75

```

76

77

#### Usage Examples

78

79

```python

80

from ray.streaming import StreamingContext

81

from ray.streaming.function import MapFunction

82

83

# Using lambda function

84

ctx = StreamingContext.Builder().build()

85

ctx.from_collection([1, 2, 3, 4, 5]) \

86

.map(lambda x: x * 2) \

87

.sink(lambda x: print(f"Doubled: {x}"))

88

89

# Using custom MapFunction

90

class SquareMapFunction(MapFunction):

91

def map(self, value):

92

return value ** 2

93

94

ctx.from_collection([1, 2, 3, 4, 5]) \

95

.map(SquareMapFunction()) \

96

.sink(lambda x: print(f"Squared: {x}"))

97

98

# Complex transformation

99

ctx.from_collection(["hello", "world", "ray", "streaming"]) \

100

.map(lambda word: {"word": word, "length": len(word), "upper": word.upper()}) \

101

.sink(lambda obj: print(f"Word: {obj['word']}, Length: {obj['length']}"))

102

103

ctx.submit("map_operations")

104

```

105

106

### FlatMap Operation

107

108

Transform each element into zero or more output elements.

109

110

```python { .api }

111

def flat_map(self, func) -> DataStream:

112

"""

113

Transform each element into multiple output elements.

114

115

Args:

116

func: Function or FlatMapFunction that returns iterable

117

118

Returns:

119

New DataStream with flattened results

120

"""

121

```

122

123

#### Usage Examples

124

125

```python

126

from ray.streaming.function import FlatMapFunction

127

128

# Split sentences into words

129

ctx.from_values("hello world", "ray streaming", "distributed computing") \

130

.flat_map(lambda sentence: sentence.split()) \

131

.map(lambda word: word.upper()) \

132

.sink(lambda word: print(f"Word: {word}"))

133

134

# Using custom FlatMapFunction

135

class TokenizeFlatMapFunction(FlatMapFunction):

136

def flat_map(self, value, collector):

137

words = value.split()

138

for word in words:

139

if len(word) > 3: # Only emit words longer than 3 characters

140

collector.collect(word.lower())

141

142

ctx.from_values("The quick brown fox jumps over the lazy dog") \

143

.flat_map(TokenizeFlatMapFunction()) \

144

.sink(print)

145

146

# Generate multiple outputs per input

147

ctx.from_collection([1, 2, 3]) \

148

.flat_map(lambda x: [x, x*2, x*3]) \

149

.sink(lambda x: print(f"Generated: {x}"))

150

151

ctx.submit("flatmap_operations")

152

```

153

154

### Filter Operation

155

156

Keep only elements that satisfy a predicate condition.

157

158

```python { .api }

159

def filter(self, func) -> DataStream:

160

"""

161

Filter elements based on a predicate function.

162

163

Args:

164

func: Function or FilterFunction returning boolean

165

166

Returns:

167

New DataStream with filtered elements

168

"""

169

```

170

171

#### Usage Examples

172

173

```python

174

from ray.streaming.function import FilterFunction

175

176

# Simple filtering

177

ctx.from_collection(range(10)) \

178

.filter(lambda x: x % 2 == 0) \

179

.sink(lambda x: print(f"Even: {x}"))

180

181

# Using custom FilterFunction

182

class PositiveFilterFunction(FilterFunction):

183

def filter(self, value):

184

return value > 0

185

186

ctx.from_collection([-3, -1, 0, 1, 5, -2, 8]) \

187

.filter(PositiveFilterFunction()) \

188

.sink(lambda x: print(f"Positive: {x}"))

189

190

# Complex filtering with string operations

191

ctx.from_values("apple", "banana", "cherry", "date", "elderberry") \

192

.filter(lambda fruit: len(fruit) > 5 and 'e' in fruit) \

193

.sink(lambda fruit: print(f"Long fruit with 'e': {fruit}"))

194

195

ctx.submit("filter_operations")

196

```

197

198

## Keyed Operations

199

200

### Key-By Operation

201

202

Partition stream by key for stateful operations.

203

204

```python { .api }

205

def key_by(self, func) -> KeyDataStream:

206

"""

207

Partition stream by key extracted using the provided function.

208

209

Args:

210

func: Function or KeyFunction to extract key from elements

211

212

Returns:

213

KeyDataStream partitioned by the key function

214

"""

215

```

216

217

#### Usage Examples

218

219

```python

220

from ray.streaming.function import KeyFunction

221

222

# Group by key for word counting

223

ctx.from_values("apple", "banana", "apple", "cherry", "banana", "apple") \

224

.map(lambda word: (word, 1)) \

225

.key_by(lambda pair: pair[0]) \

226

.reduce(lambda old, new: (old[0], old[1] + new[1])) \

227

.sink(lambda result: print(f"Count: {result[0]} = {result[1]}"))

228

229

# Using custom KeyFunction

230

class CategoryKeyFunction(KeyFunction):

231

def key_by(self, value):

232

# Group items by their category

233

categories = {

234

'fruits': ['apple', 'banana', 'cherry'],

235

'vegetables': ['carrot', 'broccoli', 'spinach'],

236

'grains': ['rice', 'wheat', 'oats']

237

}

238

for category, items in categories.items():

239

if value in items:

240

return category

241

return 'other'

242

243

ctx.from_values("apple", "carrot", "banana", "rice", "broccoli") \

244

.key_by(CategoryKeyFunction()) \

245

.reduce(lambda old, new: f"{old},{new}" if isinstance(old, str) else f"{old},{new}") \

246

.sink(lambda result: print(f"Category items: {result}"))

247

248

ctx.submit("keyed_operations")

249

```

250

251

### Reduce Operation

252

253

Combine elements with the same key using a reduce function.

254

255

```python { .api }

256

def reduce(self, func) -> DataStream:

257

"""

258

Reduce elements with the same key using the provided function.

259

260

Args:

261

func: Function or ReduceFunction for combining values

262

263

Returns:

264

DataStream with reduced values per key

265

"""

266

```

267

268

#### Usage Examples

269

270

```python

271

from ray.streaming.function import ReduceFunction

272

273

# Sum values by key

274

ctx.from_values(("A", 10), ("B", 5), ("A", 15), ("B", 20), ("A", 8)) \

275

.key_by(lambda pair: pair[0]) \

276

.reduce(lambda old, new: (old[0], old[1] + new[1])) \

277

.sink(lambda result: print(f"Sum for {result[0]}: {result[1]}"))

278

279

# Using custom ReduceFunction

280

class MaxReduceFunction(ReduceFunction):

281

def reduce(self, old_value, new_value):

282

return max(old_value, new_value, key=lambda x: x[1])

283

284

ctx.from_values(("user1", 100), ("user2", 85), ("user1", 120), ("user2", 95)) \

285

.key_by(lambda pair: pair[0]) \

286

.reduce(MaxReduceFunction()) \

287

.sink(lambda result: print(f"Max score for {result[0]}: {result[1]}"))

288

289

# String aggregation

290

ctx.from_values(("group1", "a"), ("group2", "x"), ("group1", "b"), ("group2", "y")) \

291

.key_by(lambda pair: pair[0]) \

292

.reduce(lambda old, new: (old[0], old[1] + new[1])) \

293

.sink(lambda result: print(f"Concatenated {result[0]}: {result[1]}"))

294

295

ctx.submit("reduce_operations")

296

```

297

298

## Stream Composition

299

300

### Union Operation

301

302

Merge multiple streams of the same type.

303

304

```python { .api }

305

def union(self, *streams) -> UnionStream:

306

"""

307

Union this stream with other streams.

308

309

Args:

310

*streams: DataStreams to union with this stream

311

312

Returns:

313

UnionStream containing elements from all input streams

314

"""

315

```

316

317

#### Usage Examples

318

319

```python

320

# Union multiple data sources

321

stream1 = ctx.from_values("source1-a", "source1-b", "source1-c")

322

stream2 = ctx.from_values("source2-x", "source2-y", "source2-z")

323

stream3 = ctx.from_values("source3-1", "source3-2", "source3-3")

324

325

# Union all streams

326

unified = stream1.union(stream2, stream3) \

327

.map(lambda x: f"Unified: {x}") \

328

.sink(print)

329

330

# Union with different processing branches

331

numbers = ctx.from_collection(range(20))

332

evens = numbers.filter(lambda x: x % 2 == 0).map(lambda x: f"Even: {x}")

333

odds = numbers.filter(lambda x: x % 2 == 1).map(lambda x: f"Odd: {x}")

334

335

evens.union(odds).sink(print)

336

337

ctx.submit("union_operations")

338

```

339

340

## Partitioning Strategies

341

342

### Broadcast Partitioning

343

344

Send all elements to every parallel instance of the next operator.

345

346

```python { .api }

347

def broadcast(self) -> DataStream:

348

"""

349

Broadcast all elements to every parallel instance.

350

351

Returns:

352

DataStream with broadcast partitioning

353

"""

354

```

355

356

#### Usage Examples

357

358

```python

359

# Broadcast configuration data to all workers

360

config_stream = ctx.from_values("config1", "config2", "config3") \

361

.broadcast() \

362

.map(lambda config: f"All workers got: {config}") \

363

.sink(print)

364

365

# Broadcast lookup table

366

lookup_data = ctx.from_values(("key1", "value1"), ("key2", "value2")) \

367

.broadcast() \

368

.map(lambda pair: f"Lookup: {pair[0]} -> {pair[1]}") \

369

.sink(print)

370

371

ctx.submit("broadcast_operations")

372

```

373

374

### Custom Partitioning

375

376

Use custom partitioning logic to control data distribution.

377

378

```python { .api }

379

def partition_by(self, partition_func) -> DataStream:

380

"""

381

Partition stream using custom partitioning function.

382

383

Args:

384

partition_func: Function or Partition instance for custom partitioning

385

386

Returns:

387

DataStream with custom partitioning

388

"""

389

```

390

391

#### Usage Examples

392

393

```python

394

from ray.streaming.partition import Partition

395

396

# Hash-based partitioning

397

def hash_partition(element):

398

return hash(str(element)) % 4

399

400

ctx.from_collection(range(20)) \

401

.partition_by(hash_partition) \

402

.map(lambda x: f"Partitioned: {x}") \

403

.sink(print)

404

405

# Custom partitioning class

406

class RegionPartition(Partition):

407

def partition(self, record, num_partitions):

408

region_map = {"US": 0, "EU": 1, "ASIA": 2}

409

region = record.get("region", "OTHER")

410

return region_map.get(region, 3) % num_partitions

411

412

regions_data = [

413

{"id": 1, "region": "US", "data": "user1"},

414

{"id": 2, "region": "EU", "data": "user2"},

415

{"id": 3, "region": "ASIA", "data": "user3"}

416

]

417

418

ctx.from_collection(regions_data) \

419

.partition_by(RegionPartition()) \

420

.map(lambda record: f"Region {record['region']}: {record['data']}") \

421

.sink(print)

422

423

ctx.submit("partition_operations")

424

```

425

426

### Forward Partitioning

427

428

Forward elements locally to the next operator.

429

430

```python { .api }

431

def forward(self) -> DataStream:

432

"""

433

Forward elements locally to avoid network transfer.

434

435

Returns:

436

DataStream with forward partitioning

437

"""

438

```

439

440

## Output Operations

441

442

### Sink Operation

443

444

Define output behavior for stream processing results.

445

446

```python { .api }

447

def sink(self, func) -> StreamSink:

448

"""

449

Create a sink for the stream.

450

451

Args:

452

func: Function or SinkFunction for handling output

453

454

Returns:

455

StreamSink representing the output operation

456

"""

457

```

458

459

#### Usage Examples

460

461

```python

462

from ray.streaming.function import SinkFunction

463

464

# Simple sink with lambda

465

ctx.from_collection([1, 2, 3, 4, 5]) \

466

.map(lambda x: x ** 2) \

467

.sink(lambda x: print(f"Result: {x}"))

468

469

# Custom SinkFunction

470

class FileSinkFunction(SinkFunction):

471

def __init__(self, filename):

472

self.filename = filename

473

self.file = None

474

475

def open(self, runtime_context):

476

self.file = open(self.filename, 'w')

477

478

def sink(self, value):

479

self.file.write(f"{value}\n")

480

self.file.flush()

481

482

def close(self):

483

if self.file:

484

self.file.close()

485

486

ctx.from_values("line1", "line2", "line3") \

487

.sink(FileSinkFunction("output.txt"))

488

489

# Database sink simulation

490

class DatabaseSinkFunction(SinkFunction):

491

def open(self, runtime_context):

492

print("Connecting to database...")

493

self.connection = "mock_db_connection"

494

495

def sink(self, value):

496

print(f"INSERT INTO results VALUES ('{value}')")

497

498

def close(self):

499

print("Closing database connection")

500

501

ctx.from_collection(range(5)) \

502

.map(lambda x: f"record_{x}") \

503

.sink(DatabaseSinkFunction())

504

505

ctx.submit("sink_operations")

506

```

507

508

## Advanced Stream Patterns

509

510

### Windowing Simulation

511

512

Although Ray Streaming doesn't have built-in windowing, you can simulate time-based processing.

513

514

```python

515

import time

516

from datetime import datetime

517

518

# Time-based processing simulation

519

def timestamped_data():

520

for i in range(10):

521

yield {"timestamp": datetime.now().isoformat(), "value": i}

522

time.sleep(0.5)

523

524

class TimestampedSource(SourceFunction):

525

def fetch(self, ctx):

526

for data in timestamped_data():

527

ctx.collect(data)

528

529

ctx.from_source(TimestampedSource()) \

530

.map(lambda record: f"Time: {record['timestamp']}, Value: {record['value']}") \

531

.sink(print)

532

```

533

534

### Multi-Stream Processing

535

536

Process multiple streams with different operations.

537

538

```python

539

# Create multiple processing branches

540

source = ctx.from_collection(range(100))

541

542

# Branch 1: Even numbers processing

543

evens = source.filter(lambda x: x % 2 == 0) \

544

.map(lambda x: f"Even: {x}")

545

546

# Branch 2: Odd numbers processing

547

odds = source.filter(lambda x: x % 2 == 1) \

548

.map(lambda x: f"Odd: {x}")

549

550

# Branch 3: Multiples of 5

551

fives = source.filter(lambda x: x % 5 == 0) \

552

.map(lambda x: f"Multiple of 5: {x}")

553

554

# Combine all branches

555

evens.union(odds, fives) \

556

.sink(print)

557

```

558

559

## Performance Considerations

560

561

### Operator Chaining

562

563

Control operator chaining for performance optimization.

564

565

```python

566

# Disable chaining for better parallelism

567

stream = ctx.from_collection(large_dataset) \

568

.disable_chain() \

569

.map(expensive_transformation) \

570

.disable_chain() \

571

.filter(complex_predicate)

572

573

# Use forward for local processing

574

local_stream = ctx.from_collection(data) \

575

.forward() \

576

.map(local_transformation)

577

```

578

579

### Parallelism Configuration

580

581

Configure parallelism for different operations.

582

583

```python

584

# Different parallelism for different operations

585

ctx.from_collection(data) \

586

.set_parallelism(8) \

587

.map(cpu_intensive_function) \

588

.set_parallelism(4) \

589

.reduce(reduction_function) \

590

.set_parallelism(1) \

591

.sink(output_function)

592

```

593

594

## See Also

595

596

- [Data Streams Documentation](./data-streams.md) - Stream classes and basic transformations

597

- [Source Functions Documentation](./source-functions.md) - Custom data source implementation

598

- [Streaming Context Documentation](./streaming-context.md) - StreamingContext and job management

599

- [Cross-Language Support Documentation](./cross-language.md) - Python/Java integration details