0
# Data Management
1
2
Stateful data management through tables and models in Faust applications. Tables provide distributed key-value storage with changelog-based replication, while models offer structured data definitions with type-safe serialization and validation capabilities.
3
4
## Capabilities
5
6
### Table Storage
7
8
Distributed key-value storage interface with changelog-based replication for stateful stream processing. Tables automatically maintain consistency across application instances and provide both local and global access patterns.
9
10
```python { .api }
11
class Table:
12
def __init__(
13
self,
14
app: App,
15
*,
16
name: str,
17
default: callable = None,
18
key_type: type = None,
19
value_type: type = None,
20
partitions: int = None,
21
window: Window = None,
22
changelog_topic: Topic = None,
23
help: str = None,
24
**kwargs
25
):
26
"""
27
Create a new distributed table.
28
29
Args:
30
app: The Faust application instance
31
name: Table name (used for changelog topic)
32
default: Default value factory function
33
key_type: Type for table keys
34
value_type: Type for table values
35
partitions: Number of changelog partitions
36
window: Window specification for windowed tables
37
changelog_topic: Custom changelog topic
38
help: Help text for CLI
39
"""
40
41
def __getitem__(self, key: any) -> any:
42
"""
43
Get value by key.
44
45
Args:
46
key: Table key
47
48
Returns:
49
Value associated with key
50
51
Raises:
52
KeyError: If key not found and no default
53
"""
54
55
def __setitem__(self, key: any, value: any) -> None:
56
"""
57
Set key-value pair.
58
59
Args:
60
key: Table key
61
value: Value to store
62
"""
63
64
def __delitem__(self, key: any) -> None:
65
"""
66
Delete key from table.
67
68
Args:
69
key: Key to delete
70
71
Raises:
72
KeyError: If key not found
73
"""
74
75
def __contains__(self, key: any) -> bool:
76
"""
77
Check if key exists in table.
78
79
Args:
80
key: Key to check
81
82
Returns:
83
True if key exists
84
"""
85
86
def get(self, key: any, default: any = None) -> any:
87
"""
88
Get value by key with optional default.
89
90
Args:
91
key: Table key
92
default: Default value if key not found
93
94
Returns:
95
Value or default
96
"""
97
98
def setdefault(self, key: any, default: any = None) -> any:
99
"""
100
Get value or set and return default.
101
102
Args:
103
key: Table key
104
default: Default value to set if key missing
105
106
Returns:
107
Existing value or newly set default
108
"""
109
110
def pop(self, key: any, *default) -> any:
111
"""
112
Remove key and return value.
113
114
Args:
115
key: Key to remove
116
*default: Optional default if key not found
117
118
Returns:
119
Value that was removed
120
121
Raises:
122
KeyError: If key not found and no default provided
123
"""
124
125
def update(self, *args, **kwargs) -> None:
126
"""
127
Update table with key-value pairs.
128
129
Args:
130
*args: Mapping or iterable of pairs
131
**kwargs: Keyword arguments as key-value pairs
132
"""
133
134
def clear(self) -> None:
135
"""Remove all items from table."""
136
137
def items(self) -> Iterator:
138
"""
139
Iterate over key-value pairs.
140
141
Returns:
142
Iterator of (key, value) tuples
143
"""
144
145
def keys(self) -> Iterator:
146
"""
147
Iterate over keys.
148
149
Returns:
150
Iterator of keys
151
"""
152
153
def values(self) -> Iterator:
154
"""
155
Iterate over values.
156
157
Returns:
158
Iterator of values
159
"""
160
161
def copy(self) -> dict:
162
"""
163
Create a dictionary copy of table contents.
164
165
Returns:
166
Dictionary with current table state
167
"""
168
169
@property
170
def name(self) -> str:
171
"""Table name."""
172
173
@property
174
def default(self) -> callable:
175
"""Default value factory."""
176
177
@property
178
def key_type(self) -> type:
179
"""Type for table keys."""
180
181
@property
182
def value_type(self) -> type:
183
"""Type for table values."""
184
185
@property
186
def changelog_topic(self) -> Topic:
187
"""Changelog topic for replication."""
188
```
189
190
### Global Table
191
192
Global table providing read-only access to table data across all application instances, regardless of partition assignment. Useful for lookup tables and reference data that all instances need access to.
193
194
```python { .api }
195
class GlobalTable(Table):
196
def __init__(
197
self,
198
app: App,
199
*,
200
name: str,
201
default: callable = None,
202
key_type: type = None,
203
value_type: type = None,
204
changelog_topic: Topic = None,
205
help: str = None,
206
**kwargs
207
):
208
"""
209
Create a new global table with read access from all instances.
210
211
Args:
212
app: The Faust application instance
213
name: Table name
214
default: Default value factory function
215
key_type: Type for table keys
216
value_type: Type for table values
217
changelog_topic: Custom changelog topic
218
help: Help text for CLI
219
"""
220
221
def __setitem__(self, key: any, value: any) -> None:
222
"""
223
Set operations not supported on global tables.
224
225
Raises:
226
NotImplementedError: Global tables are read-only
227
"""
228
229
def __delitem__(self, key: any) -> None:
230
"""
231
Delete operations not supported on global tables.
232
233
Raises:
234
NotImplementedError: Global tables are read-only
235
"""
236
```
237
238
### Set Tables
239
240
Specialized table implementations for storing sets of values, providing set operations and membership testing with distributed consistency.
241
242
```python { .api }
243
class SetTable:
244
def __init__(
245
self,
246
app: App,
247
*,
248
name: str,
249
key_type: type = None,
250
value_type: type = None,
251
partitions: int = None,
252
changelog_topic: Topic = None,
253
help: str = None,
254
**kwargs
255
):
256
"""
257
Create a distributed set table.
258
259
Args:
260
app: The Faust application instance
261
name: Table name
262
key_type: Type for set keys
263
value_type: Type for set elements
264
partitions: Number of changelog partitions
265
changelog_topic: Custom changelog topic
266
help: Help text for CLI
267
"""
268
269
def add(self, key: any, value: any) -> None:
270
"""
271
Add element to set at key.
272
273
Args:
274
key: Set key
275
value: Element to add
276
"""
277
278
def discard(self, key: any, value: any) -> None:
279
"""
280
Remove element from set at key if present.
281
282
Args:
283
key: Set key
284
value: Element to remove
285
"""
286
287
def remove(self, key: any, value: any) -> None:
288
"""
289
Remove element from set at key.
290
291
Args:
292
key: Set key
293
value: Element to remove
294
295
Raises:
296
KeyError: If element not in set
297
"""
298
299
def __contains__(self, item: tuple) -> bool:
300
"""
301
Test membership of (key, value) pair.
302
303
Args:
304
item: (key, value) tuple to test
305
306
Returns:
307
True if value is in set at key
308
"""
309
310
def intersection(self, key: any, *others) -> set:
311
"""
312
Return intersection with other sets.
313
314
Args:
315
key: Set key
316
*others: Other sets to intersect with
317
318
Returns:
319
Set intersection
320
"""
321
322
def union(self, key: any, *others) -> set:
323
"""
324
Return union with other sets.
325
326
Args:
327
key: Set key
328
*others: Other sets to union with
329
330
Returns:
331
Set union
332
"""
333
334
class SetGlobalTable(SetTable, GlobalTable):
335
"""Global set table combining set operations with global access."""
336
pass
337
```
338
339
### Data Models
340
341
Structured data classes for type-safe serialization and deserialization of messages and table values. Models provide schema validation, field typing, and automatic serialization support.
342
343
```python { .api }
344
class Model:
345
def __init__(self, *args, **kwargs):
346
"""
347
Create model instance with field values.
348
349
Args:
350
*args: Positional field values
351
**kwargs: Named field values
352
"""
353
354
def dumps(self, *, serializer: str = None) -> bytes:
355
"""
356
Serialize model to bytes.
357
358
Args:
359
serializer: Serializer to use (defaults to model serializer)
360
361
Returns:
362
Serialized model data
363
"""
364
365
@classmethod
366
def loads(
367
cls,
368
s: bytes,
369
*,
370
serializer: str = None,
371
default_serializer: str = None
372
):
373
"""
374
Deserialize model from bytes.
375
376
Args:
377
s: Serialized data
378
serializer: Serializer to use
379
default_serializer: Fallback serializer
380
381
Returns:
382
Model instance
383
"""
384
385
def asdict(self) -> dict:
386
"""
387
Convert model to dictionary.
388
389
Returns:
390
Dictionary representation of model
391
"""
392
393
def derive(self, **fields):
394
"""
395
Create new model instance with updated fields.
396
397
Args:
398
**fields: Fields to update
399
400
Returns:
401
New model instance with changes
402
"""
403
404
@property
405
def _options(self) -> 'ModelOptions':
406
"""Model configuration options."""
407
408
class Record(Model):
409
"""
410
Record model with automatic field detection from type annotations.
411
412
Example:
413
class User(faust.Record):
414
id: int
415
name: str
416
email: str = None
417
"""
418
419
def __init_subclass__(cls, **kwargs):
420
"""Initialize record subclass with field introspection."""
421
super().__init_subclass__(**kwargs)
422
423
class ModelOptions:
424
def __init__(
425
self,
426
*,
427
serializer: str = None,
428
include_metadata: bool = True,
429
polymorphic_fields: bool = False,
430
allow_blessed_key: bool = False,
431
isodates: bool = False,
432
decimals: bool = False,
433
validation: bool = False,
434
**kwargs
435
):
436
"""
437
Model configuration options.
438
439
Args:
440
serializer: Default serializer
441
include_metadata: Include type metadata in serialization
442
polymorphic_fields: Support polymorphic field types
443
allow_blessed_key: Allow blessed key optimization
444
isodates: Parse ISO date strings to datetime objects
445
decimals: Use decimal.Decimal for float fields
446
validation: Enable field validation
447
"""
448
449
@property
450
def serializer(self) -> str:
451
"""Default serializer name."""
452
453
@property
454
def include_metadata(self) -> bool:
455
"""Whether to include type metadata."""
456
```
457
458
### Field Types
459
460
Type definitions and validation for model fields with automatic conversion and validation support.
461
462
```python { .api }
463
from typing import Optional, List, Dict, Any
464
from datetime import datetime
465
from decimal import Decimal
466
467
class FieldDescriptor:
468
def __init__(
469
self,
470
*,
471
required: bool = True,
472
default: Any = None,
473
default_factory: callable = None,
474
coerce: bool = True,
475
validator: callable = None,
476
exclude: bool = False,
477
**kwargs
478
):
479
"""
480
Field descriptor for model attributes.
481
482
Args:
483
required: Field is required (no None values)
484
default: Default value
485
default_factory: Factory for default values
486
coerce: Attempt type coercion
487
validator: Validation function
488
exclude: Exclude from serialization
489
"""
490
491
def DatetimeField(*, timezone: str = None, **kwargs) -> datetime:
492
"""
493
Datetime field with timezone support.
494
495
Args:
496
timezone: Timezone name (e.g., 'UTC')
497
**kwargs: Additional field options
498
499
Returns:
500
Field descriptor for datetime values
501
"""
502
503
def DecimalField(*, max_digits: int = None, decimal_places: int = None, **kwargs) -> Decimal:
504
"""
505
Decimal field for precise numeric values.
506
507
Args:
508
max_digits: Maximum number of digits
509
decimal_places: Number of decimal places
510
**kwargs: Additional field options
511
512
Returns:
513
Field descriptor for Decimal values
514
"""
515
516
class StringField(FieldDescriptor):
517
"""
518
String field descriptor for text values.
519
520
Provides validation and processing for string-type model fields.
521
"""
522
523
def maybe_model(arg: any) -> any:
524
"""
525
Convert dictionary to model instance if it has model metadata.
526
527
Checks if the argument is a dictionary with Faust model metadata
528
and converts it to the appropriate model instance.
529
530
Args:
531
arg: Value to potentially convert to model
532
533
Returns:
534
Model instance if arg contains model metadata, otherwise arg unchanged
535
"""
536
537
registry: dict = {}
538
"""
539
Global registry of model classes by namespace.
540
541
Maps model namespace strings to their corresponding model classes,
542
enabling deserialization of models from their serialized representations.
543
"""
544
```
545
546
## Usage Examples
547
548
### Basic Table Operations
549
550
```python
551
import faust
552
553
app = faust.App('table-app', broker='kafka://localhost:9092')
554
555
# Create a table with default values
556
user_scores = app.Table('user-scores', default=int)
557
558
@app.agent()
559
async def update_scores(stream):
560
async for event in stream:
561
user_id = event['user_id']
562
points = event['points']
563
564
# Increment user score
565
user_scores[user_id] += points
566
567
print(f"User {user_id} now has {user_scores[user_id]} points")
568
569
# Access table data
570
@app.timer(interval=30.0)
571
async def print_leaderboard():
572
top_users = sorted(
573
user_scores.items(),
574
key=lambda x: x[1],
575
reverse=True
576
)[:10]
577
578
for user_id, score in top_users:
579
print(f"{user_id}: {score}")
580
```
581
582
### Windowed Tables
583
584
```python
585
from faust import TumblingWindow
586
587
# Table with time-based windows
588
hourly_stats = app.Table(
589
'hourly-stats',
590
default=lambda: {'count': 0, 'total': 0},
591
window=TumblingWindow(3600) # 1 hour windows
592
)
593
594
@app.agent()
595
async def collect_stats(stream):
596
async for event in stream:
597
key = event['category']
598
value = event['value']
599
600
# Update stats for current hour
601
stats = hourly_stats[key]
602
stats['count'] += 1
603
stats['total'] += value
604
hourly_stats[key] = stats
605
```
606
607
### Structured Data Models
608
609
```python
610
class Order(faust.Record):
611
id: int
612
customer_id: str
613
product_id: str
614
quantity: int
615
price: float
616
timestamp: datetime
617
618
class Meta:
619
serializer = 'json'
620
621
class OrderStatus(faust.Record):
622
order_id: int
623
status: str
624
updated_at: datetime
625
626
# Use models with topics and tables
627
orders_topic = app.topic('orders', value_type=Order)
628
order_status_table = app.Table('order-status', value_type=OrderStatus)
629
630
@app.agent(orders_topic)
631
async def process_orders(orders):
632
async for order in orders:
633
# Type-safe access to order fields
634
print(f"Processing order {order.id} for {order.quantity} units")
635
636
# Store order status
637
status = OrderStatus(
638
order_id=order.id,
639
status='processing',
640
updated_at=datetime.utcnow()
641
)
642
order_status_table[order.id] = status
643
```
644
645
### Set Table Operations
646
647
```python
648
# Track user sessions
649
user_sessions = app.SetTable('user-sessions')
650
651
@app.agent()
652
async def track_sessions(events):
653
async for event in events:
654
user_id = event['user_id']
655
session_id = event['session_id']
656
action = event['action']
657
658
if action == 'login':
659
user_sessions.add(user_id, session_id)
660
elif action == 'logout':
661
user_sessions.discard(user_id, session_id)
662
663
# Check active sessions
664
@app.timer(interval=60.0)
665
async def monitor_sessions():
666
for user_id in user_sessions.keys():
667
sessions = user_sessions[user_id]
668
if len(sessions) > 5:
669
print(f"User {user_id} has {len(sessions)} active sessions")
670
```
671
672
## Type Interfaces
673
674
```python { .api }
675
from typing import Protocol, Iterator, Any, Optional, Callable, Dict
676
677
class TableT(Protocol):
678
"""Type interface for Table."""
679
680
name: str
681
key_type: Optional[type]
682
value_type: Optional[type]
683
684
def __getitem__(self, key: Any) -> Any: ...
685
def __setitem__(self, key: Any, value: Any) -> None: ...
686
def __delitem__(self, key: Any) -> None: ...
687
def __contains__(self, key: Any) -> bool: ...
688
689
def get(self, key: Any, default: Any = None) -> Any: ...
690
def items(self) -> Iterator: ...
691
def keys(self) -> Iterator: ...
692
def values(self) -> Iterator: ...
693
694
class ModelT(Protocol):
695
"""Type interface for Model."""
696
697
def dumps(self, *, serializer: Optional[str] = None) -> bytes: ...
698
699
@classmethod
700
def loads(cls, s: bytes, **kwargs) -> 'ModelT': ...
701
702
def asdict(self) -> Dict[str, Any]: ...
703
def derive(self, **fields) -> 'ModelT': ...
704
```