0
# Stream Processing
1
2
Stream processing components for consuming and transforming data streams in real-time. Includes agents for processing message streams, stream transformation operations, and event handling for building reactive data processing pipelines.
3
4
## Capabilities
5
6
### Agent Class
7
8
Stream processing agents that consume from channels or topics. Agents are async functions decorated with `@app.agent()` that automatically handle message consumption, acknowledgment, error handling, and scaling.
9
10
```python { .api }
11
class Agent:
12
def __init__(
13
self,
14
fun,
15
*,
16
channel=None,
17
name: str = None,
18
concurrency: int = 1,
19
sink: list = None,
20
on_error: callable = None,
21
supervisor_strategy: str = None,
22
help: str = None,
23
**kwargs
24
):
25
"""
26
Stream processing agent.
27
28
Args:
29
fun: Async function to process stream
30
channel: Channel or topic to consume from
31
name: Agent name
32
concurrency: Number of concurrent instances
33
sink: Channels to forward results to
34
on_error: Error handler function
35
supervisor_strategy: Error recovery strategy
36
help: Help text
37
"""
38
39
async def send(
40
self,
41
value=None,
42
*,
43
key=None,
44
partition: int = None
45
):
46
"""
47
Send message to agent's channel.
48
49
Args:
50
value: Message value
51
key: Message key
52
partition: Target partition
53
"""
54
55
async def ask(
56
self,
57
value=None,
58
*,
59
key=None,
60
partition: int = None,
61
reply_to: str = None,
62
correlation_id: str = None
63
):
64
"""
65
Send message and wait for reply (RPC-style).
66
67
Args:
68
value: Message value
69
key: Message key
70
partition: Target partition
71
reply_to: Reply topic
72
correlation_id: Request correlation ID
73
74
Returns:
75
Reply message
76
"""
77
78
def cast(
79
self,
80
value=None,
81
*,
82
key=None,
83
partition: int = None
84
):
85
"""
86
Send message without waiting (fire-and-forget).
87
88
Args:
89
value: Message value
90
key: Message key
91
partition: Target partition
92
"""
93
94
async def start(self):
95
"""Start the agent."""
96
97
async def stop(self):
98
"""Stop the agent."""
99
100
def cancel(self):
101
"""Cancel the agent."""
102
103
@property
104
def channel(self):
105
"""Agent's input channel."""
106
107
@property
108
def concurrency(self) -> int:
109
"""Number of concurrent instances."""
110
111
@property
112
def help(self) -> str:
113
"""Help text for CLI."""
114
```
115
116
Usage Example:
117
118
```python
119
# Basic agent
120
@app.agent(app.topic('orders'))
121
async def process_orders(orders):
122
async for order in orders:
123
print(f'Processing order: {order}')
124
125
# Agent with RPC support
126
@app.agent(app.topic('calculations'))
127
async def calculator(calculations):
128
async for calc in calculations:
129
result = perform_calculation(calc.operation, calc.values)
130
await calc.send(
131
calc.reply_to,
132
key=calc.correlation_id,
133
value=result
134
)
135
136
# Sending to agents
137
await process_orders.send({'id': 1, 'amount': 100})
138
result = await calculator.ask({'operation': 'sum', 'values': [1, 2, 3]})
139
calculator.cast({'operation': 'multiply', 'values': [4, 5]})
140
```
141
142
### Stream Class
143
144
Stream processing interface providing transformation operations on data streams. Streams are async iterators that can be chained with functional programming operations.
145
146
```python { .api }
147
class Stream:
148
def __init__(self, channel, **kwargs):
149
"""
150
Create stream from channel.
151
152
Args:
153
channel: Input channel
154
**kwargs: Stream options
155
"""
156
157
def __aiter__(self):
158
"""Async iterator interface."""
159
160
async def __anext__(self):
161
"""Get next item from stream."""
162
163
def items(self):
164
"""
165
Iterate over (key, value) pairs.
166
167
Returns:
168
Stream of (key, value) tuples
169
"""
170
171
def filter(self, fun):
172
"""
173
Filter stream items based on predicate.
174
175
Args:
176
fun: Predicate function (item) -> bool
177
178
Returns:
179
Filtered stream
180
"""
181
182
def map(self, fun):
183
"""
184
Transform stream items.
185
186
Args:
187
fun: Transform function (item) -> new_item
188
189
Returns:
190
Transformed stream
191
"""
192
193
def group_by(
194
self,
195
key,
196
*,
197
name: str = None,
198
topic: str = None
199
):
200
"""
201
Group stream by key function.
202
203
Args:
204
key: Key extraction function
205
name: Group name
206
topic: Intermediate topic for grouping
207
208
Returns:
209
Grouped stream
210
"""
211
212
def take(
213
self,
214
max_: int,
215
*,
216
within: float = None
217
):
218
"""
219
Take at most N items from stream.
220
221
Args:
222
max_: Maximum number of items
223
within: Time window in seconds
224
225
Returns:
226
Limited stream
227
"""
228
229
def rate_limit(
230
self,
231
rate: float,
232
*,
233
per: float = 1.0,
234
within: float = None
235
):
236
"""
237
Rate limit stream processing.
238
239
Args:
240
rate: Maximum rate (items per second)
241
per: Time period for rate calculation
242
within: Rate limit window
243
244
Returns:
245
Rate-limited stream
246
"""
247
248
def buffer(
249
self,
250
size: int,
251
*,
252
timeout: float = None
253
):
254
"""
255
Buffer stream items.
256
257
Args:
258
size: Buffer size
259
timeout: Buffer flush timeout
260
261
Returns:
262
Buffered stream
263
"""
264
265
def through(self, channel, **kwargs):
266
"""
267
Route stream through another channel.
268
269
Args:
270
channel: Target channel
271
**kwargs: Routing options
272
273
Returns:
274
Routed stream
275
"""
276
277
def echo(self, *args, **kwargs):
278
"""
279
Echo stream items to stdout.
280
281
Returns:
282
Echo stream
283
"""
284
285
def join(self, *streams, **kwargs):
286
"""
287
Join with other streams.
288
289
Args:
290
*streams: Streams to join with
291
**kwargs: Join options
292
293
Returns:
294
Joined stream
295
"""
296
297
def combine(self, *streams, **kwargs):
298
"""
299
Combine with other streams.
300
301
Args:
302
*streams: Streams to combine
303
**kwargs: Combine options
304
305
Returns:
306
Combined stream
307
"""
308
309
def concat(self, *streams, **kwargs):
310
"""
311
Concatenate with other streams.
312
313
Args:
314
*streams: Streams to concatenate
315
**kwargs: Concat options
316
317
Returns:
318
Concatenated stream
319
"""
320
321
def tee(self, *streams, **kwargs):
322
"""
323
Split stream to multiple outputs.
324
325
Args:
326
*streams: Output streams
327
**kwargs: Tee options
328
329
Returns:
330
Teed stream
331
"""
332
```
333
334
Usage Example:
335
336
```python
337
# Basic stream processing
338
@app.agent(app.topic('numbers'))
339
async def process_numbers(stream):
340
async for number in stream:
341
print(f'Number: {number}')
342
343
# Stream transformations
344
@app.agent(app.topic('raw-data'))
345
async def transform_data(stream):
346
async for item in stream.filter(lambda x: x.is_valid).map(lambda x: x.processed_value):
347
await save_processed_item(item)
348
349
# Stream grouping and aggregation
350
@app.agent(app.topic('events'))
351
async def aggregate_events(stream):
352
async for user_id, events in stream.group_by(lambda event: event.user_id):
353
count = 0
354
async for event in events:
355
count += 1
356
if count >= 10:
357
await send_alert(user_id, count)
358
count = 0
359
360
# Rate limiting and buffering
361
@app.agent(app.topic('api-calls'))
362
async def rate_limited_processing(stream):
363
async for batch in stream.rate_limit(100.0).buffer(50, timeout=5.0):
364
await process_batch(batch)
365
```
366
367
### Event Class
368
369
Event container representing a single message in a stream with metadata, acknowledgment capabilities, and forwarding operations.
370
371
```python { .api }
372
class Event:
373
def __init__(
374
self,
375
key=None,
376
value=None,
377
headers: dict = None,
378
message=None,
379
timestamp: float = None
380
):
381
"""
382
Stream event container.
383
384
Args:
385
key: Event key
386
value: Event value
387
headers: Event headers
388
message: Underlying message object
389
timestamp: Event timestamp
390
"""
391
392
def ack(self):
393
"""Acknowledge event processing."""
394
395
def reject(self):
396
"""Reject event (negative acknowledgment)."""
397
398
async def send(
399
self,
400
channel,
401
key=None,
402
value=None,
403
partition: int = None,
404
timestamp: float = None,
405
headers: dict = None,
406
**kwargs
407
):
408
"""
409
Send new event to channel.
410
411
Args:
412
channel: Target channel
413
key: Event key (defaults to current key)
414
value: Event value (defaults to current value)
415
partition: Target partition
416
timestamp: Event timestamp
417
headers: Event headers
418
**kwargs: Additional options
419
"""
420
421
async def forward(
422
self,
423
channel,
424
*,
425
key=None,
426
value=None,
427
partition: int = None,
428
timestamp: float = None,
429
headers: dict = None,
430
**kwargs
431
):
432
"""
433
Forward event to another channel.
434
435
Args:
436
channel: Target channel
437
key: Override key
438
value: Override value
439
partition: Target partition
440
timestamp: Override timestamp
441
headers: Additional headers
442
**kwargs: Additional options
443
"""
444
445
@property
446
def key(self):
447
"""Event key."""
448
449
@property
450
def value(self):
451
"""Event value."""
452
453
@property
454
def headers(self) -> dict:
455
"""Event headers."""
456
457
@property
458
def message(self):
459
"""Underlying message object."""
460
461
@property
462
def timestamp(self) -> float:
463
"""Event timestamp (Unix timestamp)."""
464
```
465
466
Usage Example:
467
468
```python
469
# Working with events
470
@app.agent(app.topic('transactions'))
471
async def process_transactions(stream):
472
async for event in stream.events():
473
try:
474
# Process the transaction
475
result = await process_transaction(event.value)
476
477
# Forward successful results
478
await event.forward(
479
success_topic,
480
value=result,
481
headers={'processed_at': time.time()}
482
)
483
484
# Acknowledge processing
485
event.ack()
486
487
except ProcessingError as exc:
488
# Forward to error topic
489
await event.forward(
490
error_topic,
491
value={'error': str(exc), 'original': event.value}
492
)
493
event.ack() # Still ack to avoid reprocessing
494
495
except FatalError:
496
# Reject for reprocessing
497
event.reject()
498
499
# Creating custom events
500
async def send_notification(user_id, message):
501
event = Event(
502
key=user_id,
503
value=message,
504
headers={'type': 'notification', 'priority': 'high'},
505
timestamp=time.time()
506
)
507
await event.send(notifications_topic)
508
```
509
510
### Current Event Access
511
512
Function to access the currently processing event within an agent context.
513
514
```python { .api }
515
def current_event() -> Event:
516
"""
517
Get the currently processing event.
518
519
Returns:
520
Current event instance
521
522
Raises:
523
RuntimeError: If called outside agent context
524
"""
525
```
526
527
Usage Example:
528
529
```python
530
@app.agent(app.topic('orders'))
531
async def process_orders(orders):
532
async for order in orders:
533
# Get current event for metadata access
534
event = faust.current_event()
535
536
# Log processing with event metadata
537
print(f'Processing order {order.id} from partition {event.message.partition}')
538
539
# Forward based on event headers
540
if event.headers.get('priority') == 'high':
541
await event.forward(priority_processing_topic)
542
else:
543
await event.forward(normal_processing_topic)
544
545
event.ack()
546
```
547
548
## Type Interfaces
549
550
```python { .api }
551
from typing import Protocol, AsyncIterator
552
553
class AgentT(Protocol):
554
"""Type interface for stream processing agents."""
555
556
async def send(self, value=None, *, key=None, partition=None): ...
557
async def ask(self, value=None, *, key=None, **kwargs): ...
558
def cast(self, value=None, *, key=None, partition=None): ...
559
560
class StreamT(Protocol):
561
"""Type interface for data streams."""
562
563
def __aiter__(self) -> AsyncIterator: ...
564
def filter(self, fun): ...
565
def map(self, fun): ...
566
def group_by(self, key, **kwargs): ...
567
568
class EventT(Protocol):
569
"""Type interface for stream events."""
570
571
key: object
572
value: object
573
headers: dict
574
timestamp: float
575
576
def ack(self): ...
577
def reject(self): ...
578
async def send(self, channel, **kwargs): ...
579
async def forward(self, channel, **kwargs): ...
580
```