or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcli-framework.mdcore-application.mddata-management.mdindex.mdmonitoring.mdserialization.mdstream-processing.mdtopics-channels.mdwindowing.mdworker-management.md

stream-processing.mddocs/

0

# Stream Processing

1

2

Stream processing components for consuming and transforming data streams in real-time. Includes agents for processing message streams, stream transformation operations, and event handling for building reactive data processing pipelines.

3

4

## Capabilities

5

6

### Agent Class

7

8

Stream processing agents that consume from channels or topics. Agents are async functions decorated with `@app.agent()` that automatically handle message consumption, acknowledgment, error handling, and scaling.

9

10

```python { .api }

11

class Agent:

12

def __init__(

13

self,

14

fun,

15

*,

16

channel=None,

17

name: str = None,

18

concurrency: int = 1,

19

sink: list = None,

20

on_error: callable = None,

21

supervisor_strategy: str = None,

22

help: str = None,

23

**kwargs

24

):

25

"""

26

Stream processing agent.

27

28

Args:

29

fun: Async function to process stream

30

channel: Channel or topic to consume from

31

name: Agent name

32

concurrency: Number of concurrent instances

33

sink: Channels to forward results to

34

on_error: Error handler function

35

supervisor_strategy: Error recovery strategy

36

help: Help text

37

"""

38

39

async def send(

40

self,

41

value=None,

42

*,

43

key=None,

44

partition: int = None

45

):

46

"""

47

Send message to agent's channel.

48

49

Args:

50

value: Message value

51

key: Message key

52

partition: Target partition

53

"""

54

55

async def ask(

56

self,

57

value=None,

58

*,

59

key=None,

60

partition: int = None,

61

reply_to: str = None,

62

correlation_id: str = None

63

):

64

"""

65

Send message and wait for reply (RPC-style).

66

67

Args:

68

value: Message value

69

key: Message key

70

partition: Target partition

71

reply_to: Reply topic

72

correlation_id: Request correlation ID

73

74

Returns:

75

Reply message

76

"""

77

78

def cast(

79

self,

80

value=None,

81

*,

82

key=None,

83

partition: int = None

84

):

85

"""

86

Send message without waiting (fire-and-forget).

87

88

Args:

89

value: Message value

90

key: Message key

91

partition: Target partition

92

"""

93

94

async def start(self):

95

"""Start the agent."""

96

97

async def stop(self):

98

"""Stop the agent."""

99

100

def cancel(self):

101

"""Cancel the agent."""

102

103

@property

104

def channel(self):

105

"""Agent's input channel."""

106

107

@property

108

def concurrency(self) -> int:

109

"""Number of concurrent instances."""

110

111

@property

112

def help(self) -> str:

113

"""Help text for CLI."""

114

```

115

116

Usage Example:

117

118

```python

119

# Basic agent

120

@app.agent(app.topic('orders'))

121

async def process_orders(orders):

122

async for order in orders:

123

print(f'Processing order: {order}')

124

125

# Agent with RPC support

126

@app.agent(app.topic('calculations'))

127

async def calculator(calculations):

128

async for calc in calculations:

129

result = perform_calculation(calc.operation, calc.values)

130

await calc.send(

131

calc.reply_to,

132

key=calc.correlation_id,

133

value=result

134

)

135

136

# Sending to agents

137

await process_orders.send({'id': 1, 'amount': 100})

138

result = await calculator.ask({'operation': 'sum', 'values': [1, 2, 3]})

139

calculator.cast({'operation': 'multiply', 'values': [4, 5]})

140

```

141

142

### Stream Class

143

144

Stream processing interface providing transformation operations on data streams. Streams are async iterators that can be chained with functional programming operations.

145

146

```python { .api }

147

class Stream:

148

def __init__(self, channel, **kwargs):

149

"""

150

Create stream from channel.

151

152

Args:

153

channel: Input channel

154

**kwargs: Stream options

155

"""

156

157

def __aiter__(self):

158

"""Async iterator interface."""

159

160

async def __anext__(self):

161

"""Get next item from stream."""

162

163

def items(self):

164

"""

165

Iterate over (key, value) pairs.

166

167

Returns:

168

Stream of (key, value) tuples

169

"""

170

171

def filter(self, fun):

172

"""

173

Filter stream items based on predicate.

174

175

Args:

176

fun: Predicate function (item) -> bool

177

178

Returns:

179

Filtered stream

180

"""

181

182

def map(self, fun):

183

"""

184

Transform stream items.

185

186

Args:

187

fun: Transform function (item) -> new_item

188

189

Returns:

190

Transformed stream

191

"""

192

193

def group_by(

194

self,

195

key,

196

*,

197

name: str = None,

198

topic: str = None

199

):

200

"""

201

Group stream by key function.

202

203

Args:

204

key: Key extraction function

205

name: Group name

206

topic: Intermediate topic for grouping

207

208

Returns:

209

Grouped stream

210

"""

211

212

def take(

213

self,

214

max_: int,

215

*,

216

within: float = None

217

):

218

"""

219

Take at most N items from stream.

220

221

Args:

222

max_: Maximum number of items

223

within: Time window in seconds

224

225

Returns:

226

Limited stream

227

"""

228

229

def rate_limit(

230

self,

231

rate: float,

232

*,

233

per: float = 1.0,

234

within: float = None

235

):

236

"""

237

Rate limit stream processing.

238

239

Args:

240

rate: Maximum rate (items per second)

241

per: Time period for rate calculation

242

within: Rate limit window

243

244

Returns:

245

Rate-limited stream

246

"""

247

248

def buffer(

249

self,

250

size: int,

251

*,

252

timeout: float = None

253

):

254

"""

255

Buffer stream items.

256

257

Args:

258

size: Buffer size

259

timeout: Buffer flush timeout

260

261

Returns:

262

Buffered stream

263

"""

264

265

def through(self, channel, **kwargs):

266

"""

267

Route stream through another channel.

268

269

Args:

270

channel: Target channel

271

**kwargs: Routing options

272

273

Returns:

274

Routed stream

275

"""

276

277

def echo(self, *args, **kwargs):

278

"""

279

Echo stream items to stdout.

280

281

Returns:

282

Echo stream

283

"""

284

285

def join(self, *streams, **kwargs):

286

"""

287

Join with other streams.

288

289

Args:

290

*streams: Streams to join with

291

**kwargs: Join options

292

293

Returns:

294

Joined stream

295

"""

296

297

def combine(self, *streams, **kwargs):

298

"""

299

Combine with other streams.

300

301

Args:

302

*streams: Streams to combine

303

**kwargs: Combine options

304

305

Returns:

306

Combined stream

307

"""

308

309

def concat(self, *streams, **kwargs):

310

"""

311

Concatenate with other streams.

312

313

Args:

314

*streams: Streams to concatenate

315

**kwargs: Concat options

316

317

Returns:

318

Concatenated stream

319

"""

320

321

def tee(self, *streams, **kwargs):

322

"""

323

Split stream to multiple outputs.

324

325

Args:

326

*streams: Output streams

327

**kwargs: Tee options

328

329

Returns:

330

Teed stream

331

"""

332

```

333

334

Usage Example:

335

336

```python

337

# Basic stream processing

338

@app.agent(app.topic('numbers'))

339

async def process_numbers(stream):

340

async for number in stream:

341

print(f'Number: {number}')

342

343

# Stream transformations

344

@app.agent(app.topic('raw-data'))

345

async def transform_data(stream):

346

async for item in stream.filter(lambda x: x.is_valid).map(lambda x: x.processed_value):

347

await save_processed_item(item)

348

349

# Stream grouping and aggregation

350

@app.agent(app.topic('events'))

351

async def aggregate_events(stream):

352

async for user_id, events in stream.group_by(lambda event: event.user_id):

353

count = 0

354

async for event in events:

355

count += 1

356

if count >= 10:

357

await send_alert(user_id, count)

358

count = 0

359

360

# Rate limiting and buffering

361

@app.agent(app.topic('api-calls'))

362

async def rate_limited_processing(stream):

363

async for batch in stream.rate_limit(100.0).buffer(50, timeout=5.0):

364

await process_batch(batch)

365

```

366

367

### Event Class

368

369

Event container representing a single message in a stream with metadata, acknowledgment capabilities, and forwarding operations.

370

371

```python { .api }

372

class Event:

373

def __init__(

374

self,

375

key=None,

376

value=None,

377

headers: dict = None,

378

message=None,

379

timestamp: float = None

380

):

381

"""

382

Stream event container.

383

384

Args:

385

key: Event key

386

value: Event value

387

headers: Event headers

388

message: Underlying message object

389

timestamp: Event timestamp

390

"""

391

392

def ack(self):

393

"""Acknowledge event processing."""

394

395

def reject(self):

396

"""Reject event (negative acknowledgment)."""

397

398

async def send(

399

self,

400

channel,

401

key=None,

402

value=None,

403

partition: int = None,

404

timestamp: float = None,

405

headers: dict = None,

406

**kwargs

407

):

408

"""

409

Send new event to channel.

410

411

Args:

412

channel: Target channel

413

key: Event key (defaults to current key)

414

value: Event value (defaults to current value)

415

partition: Target partition

416

timestamp: Event timestamp

417

headers: Event headers

418

**kwargs: Additional options

419

"""

420

421

async def forward(

422

self,

423

channel,

424

*,

425

key=None,

426

value=None,

427

partition: int = None,

428

timestamp: float = None,

429

headers: dict = None,

430

**kwargs

431

):

432

"""

433

Forward event to another channel.

434

435

Args:

436

channel: Target channel

437

key: Override key

438

value: Override value

439

partition: Target partition

440

timestamp: Override timestamp

441

headers: Additional headers

442

**kwargs: Additional options

443

"""

444

445

@property

446

def key(self):

447

"""Event key."""

448

449

@property

450

def value(self):

451

"""Event value."""

452

453

@property

454

def headers(self) -> dict:

455

"""Event headers."""

456

457

@property

458

def message(self):

459

"""Underlying message object."""

460

461

@property

462

def timestamp(self) -> float:

463

"""Event timestamp (Unix timestamp)."""

464

```

465

466

Usage Example:

467

468

```python

469

# Working with events

470

@app.agent(app.topic('transactions'))

471

async def process_transactions(stream):

472

async for event in stream.events():

473

try:

474

# Process the transaction

475

result = await process_transaction(event.value)

476

477

# Forward successful results

478

await event.forward(

479

success_topic,

480

value=result,

481

headers={'processed_at': time.time()}

482

)

483

484

# Acknowledge processing

485

event.ack()

486

487

except ProcessingError as exc:

488

# Forward to error topic

489

await event.forward(

490

error_topic,

491

value={'error': str(exc), 'original': event.value}

492

)

493

event.ack() # Still ack to avoid reprocessing

494

495

except FatalError:

496

# Reject for reprocessing

497

event.reject()

498

499

# Creating custom events

500

async def send_notification(user_id, message):

501

event = Event(

502

key=user_id,

503

value=message,

504

headers={'type': 'notification', 'priority': 'high'},

505

timestamp=time.time()

506

)

507

await event.send(notifications_topic)

508

```

509

510

### Current Event Access

511

512

Function to access the currently processing event within an agent context.

513

514

```python { .api }

515

def current_event() -> Event:

516

"""

517

Get the currently processing event.

518

519

Returns:

520

Current event instance

521

522

Raises:

523

RuntimeError: If called outside agent context

524

"""

525

```

526

527

Usage Example:

528

529

```python

530

@app.agent(app.topic('orders'))

531

async def process_orders(orders):

532

async for order in orders:

533

# Get current event for metadata access

534

event = faust.current_event()

535

536

# Log processing with event metadata

537

print(f'Processing order {order.id} from partition {event.message.partition}')

538

539

# Forward based on event headers

540

if event.headers.get('priority') == 'high':

541

await event.forward(priority_processing_topic)

542

else:

543

await event.forward(normal_processing_topic)

544

545

event.ack()

546

```

547

548

## Type Interfaces

549

550

```python { .api }

551

from typing import Protocol, AsyncIterator

552

553

class AgentT(Protocol):

554

"""Type interface for stream processing agents."""

555

556

async def send(self, value=None, *, key=None, partition=None): ...

557

async def ask(self, value=None, *, key=None, **kwargs): ...

558

def cast(self, value=None, *, key=None, partition=None): ...

559

560

class StreamT(Protocol):

561

"""Type interface for data streams."""

562

563

def __aiter__(self) -> AsyncIterator: ...

564

def filter(self, fun): ...

565

def map(self, fun): ...

566

def group_by(self, key, **kwargs): ...

567

568

class EventT(Protocol):

569

"""Type interface for stream events."""

570

571

key: object

572

value: object

573

headers: dict

574

timestamp: float

575

576

def ack(self): ...

577

def reject(self): ...

578

async def send(self, channel, **kwargs): ...

579

async def forward(self, channel, **kwargs): ...

580

```