or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcli-framework.mdcore-application.mddata-management.mdindex.mdmonitoring.mdserialization.mdstream-processing.mdtopics-channels.mdwindowing.mdworker-management.md

data-management.mddocs/

0

# Data Management

1

2

Stateful data management through tables and models in Faust applications. Tables provide distributed key-value storage with changelog-based replication, while models offer structured data definitions with type-safe serialization and validation capabilities.

3

4

## Capabilities

5

6

### Table Storage

7

8

Distributed key-value storage interface with changelog-based replication for stateful stream processing. Tables automatically maintain consistency across application instances and provide both local and global access patterns.

9

10

```python { .api }

11

class Table:

12

def __init__(

13

self,

14

app: App,

15

*,

16

name: str,

17

default: callable = None,

18

key_type: type = None,

19

value_type: type = None,

20

partitions: int = None,

21

window: Window = None,

22

changelog_topic: Topic = None,

23

help: str = None,

24

**kwargs

25

):

26

"""

27

Create a new distributed table.

28

29

Args:

30

app: The Faust application instance

31

name: Table name (used for changelog topic)

32

default: Default value factory function

33

key_type: Type for table keys

34

value_type: Type for table values

35

partitions: Number of changelog partitions

36

window: Window specification for windowed tables

37

changelog_topic: Custom changelog topic

38

help: Help text for CLI

39

"""

40

41

def __getitem__(self, key: any) -> any:

42

"""

43

Get value by key.

44

45

Args:

46

key: Table key

47

48

Returns:

49

Value associated with key

50

51

Raises:

52

KeyError: If key not found and no default

53

"""

54

55

def __setitem__(self, key: any, value: any) -> None:

56

"""

57

Set key-value pair.

58

59

Args:

60

key: Table key

61

value: Value to store

62

"""

63

64

def __delitem__(self, key: any) -> None:

65

"""

66

Delete key from table.

67

68

Args:

69

key: Key to delete

70

71

Raises:

72

KeyError: If key not found

73

"""

74

75

def __contains__(self, key: any) -> bool:

76

"""

77

Check if key exists in table.

78

79

Args:

80

key: Key to check

81

82

Returns:

83

True if key exists

84

"""

85

86

def get(self, key: any, default: any = None) -> any:

87

"""

88

Get value by key with optional default.

89

90

Args:

91

key: Table key

92

default: Default value if key not found

93

94

Returns:

95

Value or default

96

"""

97

98

def setdefault(self, key: any, default: any = None) -> any:

99

"""

100

Get value or set and return default.

101

102

Args:

103

key: Table key

104

default: Default value to set if key missing

105

106

Returns:

107

Existing value or newly set default

108

"""

109

110

def pop(self, key: any, *default) -> any:

111

"""

112

Remove key and return value.

113

114

Args:

115

key: Key to remove

116

*default: Optional default if key not found

117

118

Returns:

119

Value that was removed

120

121

Raises:

122

KeyError: If key not found and no default provided

123

"""

124

125

def update(self, *args, **kwargs) -> None:

126

"""

127

Update table with key-value pairs.

128

129

Args:

130

*args: Mapping or iterable of pairs

131

**kwargs: Keyword arguments as key-value pairs

132

"""

133

134

def clear(self) -> None:

135

"""Remove all items from table."""

136

137

def items(self) -> Iterator:

138

"""

139

Iterate over key-value pairs.

140

141

Returns:

142

Iterator of (key, value) tuples

143

"""

144

145

def keys(self) -> Iterator:

146

"""

147

Iterate over keys.

148

149

Returns:

150

Iterator of keys

151

"""

152

153

def values(self) -> Iterator:

154

"""

155

Iterate over values.

156

157

Returns:

158

Iterator of values

159

"""

160

161

def copy(self) -> dict:

162

"""

163

Create a dictionary copy of table contents.

164

165

Returns:

166

Dictionary with current table state

167

"""

168

169

@property

170

def name(self) -> str:

171

"""Table name."""

172

173

@property

174

def default(self) -> callable:

175

"""Default value factory."""

176

177

@property

178

def key_type(self) -> type:

179

"""Type for table keys."""

180

181

@property

182

def value_type(self) -> type:

183

"""Type for table values."""

184

185

@property

186

def changelog_topic(self) -> Topic:

187

"""Changelog topic for replication."""

188

```

189

190

### Global Table

191

192

Global table providing read-only access to table data across all application instances, regardless of partition assignment. Useful for lookup tables and reference data that all instances need access to.

193

194

```python { .api }

195

class GlobalTable(Table):

196

def __init__(

197

self,

198

app: App,

199

*,

200

name: str,

201

default: callable = None,

202

key_type: type = None,

203

value_type: type = None,

204

changelog_topic: Topic = None,

205

help: str = None,

206

**kwargs

207

):

208

"""

209

Create a new global table with read access from all instances.

210

211

Args:

212

app: The Faust application instance

213

name: Table name

214

default: Default value factory function

215

key_type: Type for table keys

216

value_type: Type for table values

217

changelog_topic: Custom changelog topic

218

help: Help text for CLI

219

"""

220

221

def __setitem__(self, key: any, value: any) -> None:

222

"""

223

Set operations not supported on global tables.

224

225

Raises:

226

NotImplementedError: Global tables are read-only

227

"""

228

229

def __delitem__(self, key: any) -> None:

230

"""

231

Delete operations not supported on global tables.

232

233

Raises:

234

NotImplementedError: Global tables are read-only

235

"""

236

```

237

238

### Set Tables

239

240

Specialized table implementations for storing sets of values, providing set operations and membership testing with distributed consistency.

241

242

```python { .api }

243

class SetTable:

244

def __init__(

245

self,

246

app: App,

247

*,

248

name: str,

249

key_type: type = None,

250

value_type: type = None,

251

partitions: int = None,

252

changelog_topic: Topic = None,

253

help: str = None,

254

**kwargs

255

):

256

"""

257

Create a distributed set table.

258

259

Args:

260

app: The Faust application instance

261

name: Table name

262

key_type: Type for set keys

263

value_type: Type for set elements

264

partitions: Number of changelog partitions

265

changelog_topic: Custom changelog topic

266

help: Help text for CLI

267

"""

268

269

def add(self, key: any, value: any) -> None:

270

"""

271

Add element to set at key.

272

273

Args:

274

key: Set key

275

value: Element to add

276

"""

277

278

def discard(self, key: any, value: any) -> None:

279

"""

280

Remove element from set at key if present.

281

282

Args:

283

key: Set key

284

value: Element to remove

285

"""

286

287

def remove(self, key: any, value: any) -> None:

288

"""

289

Remove element from set at key.

290

291

Args:

292

key: Set key

293

value: Element to remove

294

295

Raises:

296

KeyError: If element not in set

297

"""

298

299

def __contains__(self, item: tuple) -> bool:

300

"""

301

Test membership of (key, value) pair.

302

303

Args:

304

item: (key, value) tuple to test

305

306

Returns:

307

True if value is in set at key

308

"""

309

310

def intersection(self, key: any, *others) -> set:

311

"""

312

Return intersection with other sets.

313

314

Args:

315

key: Set key

316

*others: Other sets to intersect with

317

318

Returns:

319

Set intersection

320

"""

321

322

def union(self, key: any, *others) -> set:

323

"""

324

Return union with other sets.

325

326

Args:

327

key: Set key

328

*others: Other sets to union with

329

330

Returns:

331

Set union

332

"""

333

334

class SetGlobalTable(SetTable, GlobalTable):

335

"""Global set table combining set operations with global access."""

336

pass

337

```

338

339

### Data Models

340

341

Structured data classes for type-safe serialization and deserialization of messages and table values. Models provide schema validation, field typing, and automatic serialization support.

342

343

```python { .api }

344

class Model:

345

def __init__(self, *args, **kwargs):

346

"""

347

Create model instance with field values.

348

349

Args:

350

*args: Positional field values

351

**kwargs: Named field values

352

"""

353

354

def dumps(self, *, serializer: str = None) -> bytes:

355

"""

356

Serialize model to bytes.

357

358

Args:

359

serializer: Serializer to use (defaults to model serializer)

360

361

Returns:

362

Serialized model data

363

"""

364

365

@classmethod

366

def loads(

367

cls,

368

s: bytes,

369

*,

370

serializer: str = None,

371

default_serializer: str = None

372

):

373

"""

374

Deserialize model from bytes.

375

376

Args:

377

s: Serialized data

378

serializer: Serializer to use

379

default_serializer: Fallback serializer

380

381

Returns:

382

Model instance

383

"""

384

385

def asdict(self) -> dict:

386

"""

387

Convert model to dictionary.

388

389

Returns:

390

Dictionary representation of model

391

"""

392

393

def derive(self, **fields):

394

"""

395

Create new model instance with updated fields.

396

397

Args:

398

**fields: Fields to update

399

400

Returns:

401

New model instance with changes

402

"""

403

404

@property

405

def _options(self) -> 'ModelOptions':

406

"""Model configuration options."""

407

408

class Record(Model):

409

"""

410

Record model with automatic field detection from type annotations.

411

412

Example:

413

class User(faust.Record):

414

id: int

415

name: str

416

email: str = None

417

"""

418

419

def __init_subclass__(cls, **kwargs):

420

"""Initialize record subclass with field introspection."""

421

super().__init_subclass__(**kwargs)

422

423

class ModelOptions:

424

def __init__(

425

self,

426

*,

427

serializer: str = None,

428

include_metadata: bool = True,

429

polymorphic_fields: bool = False,

430

allow_blessed_key: bool = False,

431

isodates: bool = False,

432

decimals: bool = False,

433

validation: bool = False,

434

**kwargs

435

):

436

"""

437

Model configuration options.

438

439

Args:

440

serializer: Default serializer

441

include_metadata: Include type metadata in serialization

442

polymorphic_fields: Support polymorphic field types

443

allow_blessed_key: Allow blessed key optimization

444

isodates: Parse ISO date strings to datetime objects

445

decimals: Use decimal.Decimal for float fields

446

validation: Enable field validation

447

"""

448

449

@property

450

def serializer(self) -> str:

451

"""Default serializer name."""

452

453

@property

454

def include_metadata(self) -> bool:

455

"""Whether to include type metadata."""

456

```

457

458

### Field Types

459

460

Type definitions and validation for model fields with automatic conversion and validation support.

461

462

```python { .api }

463

from typing import Optional, List, Dict, Any

464

from datetime import datetime

465

from decimal import Decimal

466

467

class FieldDescriptor:

468

def __init__(

469

self,

470

*,

471

required: bool = True,

472

default: Any = None,

473

default_factory: callable = None,

474

coerce: bool = True,

475

validator: callable = None,

476

exclude: bool = False,

477

**kwargs

478

):

479

"""

480

Field descriptor for model attributes.

481

482

Args:

483

required: Field is required (no None values)

484

default: Default value

485

default_factory: Factory for default values

486

coerce: Attempt type coercion

487

validator: Validation function

488

exclude: Exclude from serialization

489

"""

490

491

def DatetimeField(*, timezone: str = None, **kwargs) -> datetime:

492

"""

493

Datetime field with timezone support.

494

495

Args:

496

timezone: Timezone name (e.g., 'UTC')

497

**kwargs: Additional field options

498

499

Returns:

500

Field descriptor for datetime values

501

"""

502

503

def DecimalField(*, max_digits: int = None, decimal_places: int = None, **kwargs) -> Decimal:

504

"""

505

Decimal field for precise numeric values.

506

507

Args:

508

max_digits: Maximum number of digits

509

decimal_places: Number of decimal places

510

**kwargs: Additional field options

511

512

Returns:

513

Field descriptor for Decimal values

514

"""

515

516

class StringField(FieldDescriptor):

517

"""

518

String field descriptor for text values.

519

520

Provides validation and processing for string-type model fields.

521

"""

522

523

def maybe_model(arg: any) -> any:

524

"""

525

Convert dictionary to model instance if it has model metadata.

526

527

Checks if the argument is a dictionary with Faust model metadata

528

and converts it to the appropriate model instance.

529

530

Args:

531

arg: Value to potentially convert to model

532

533

Returns:

534

Model instance if arg contains model metadata, otherwise arg unchanged

535

"""

536

537

registry: dict = {}

538

"""

539

Global registry of model classes by namespace.

540

541

Maps model namespace strings to their corresponding model classes,

542

enabling deserialization of models from their serialized representations.

543

"""

544

```

545

546

## Usage Examples

547

548

### Basic Table Operations

549

550

```python

551

import faust

552

553

app = faust.App('table-app', broker='kafka://localhost:9092')

554

555

# Create a table with default values

556

user_scores = app.Table('user-scores', default=int)

557

558

@app.agent()

559

async def update_scores(stream):

560

async for event in stream:

561

user_id = event['user_id']

562

points = event['points']

563

564

# Increment user score

565

user_scores[user_id] += points

566

567

print(f"User {user_id} now has {user_scores[user_id]} points")

568

569

# Access table data

570

@app.timer(interval=30.0)

571

async def print_leaderboard():

572

top_users = sorted(

573

user_scores.items(),

574

key=lambda x: x[1],

575

reverse=True

576

)[:10]

577

578

for user_id, score in top_users:

579

print(f"{user_id}: {score}")

580

```

581

582

### Windowed Tables

583

584

```python

585

from faust import TumblingWindow

586

587

# Table with time-based windows

588

hourly_stats = app.Table(

589

'hourly-stats',

590

default=lambda: {'count': 0, 'total': 0},

591

window=TumblingWindow(3600) # 1 hour windows

592

)

593

594

@app.agent()

595

async def collect_stats(stream):

596

async for event in stream:

597

key = event['category']

598

value = event['value']

599

600

# Update stats for current hour

601

stats = hourly_stats[key]

602

stats['count'] += 1

603

stats['total'] += value

604

hourly_stats[key] = stats

605

```

606

607

### Structured Data Models

608

609

```python

610

class Order(faust.Record):

611

id: int

612

customer_id: str

613

product_id: str

614

quantity: int

615

price: float

616

timestamp: datetime

617

618

class Meta:

619

serializer = 'json'

620

621

class OrderStatus(faust.Record):

622

order_id: int

623

status: str

624

updated_at: datetime

625

626

# Use models with topics and tables

627

orders_topic = app.topic('orders', value_type=Order)

628

order_status_table = app.Table('order-status', value_type=OrderStatus)

629

630

@app.agent(orders_topic)

631

async def process_orders(orders):

632

async for order in orders:

633

# Type-safe access to order fields

634

print(f"Processing order {order.id} for {order.quantity} units")

635

636

# Store order status

637

status = OrderStatus(

638

order_id=order.id,

639

status='processing',

640

updated_at=datetime.utcnow()

641

)

642

order_status_table[order.id] = status

643

```

644

645

### Set Table Operations

646

647

```python

648

# Track user sessions

649

user_sessions = app.SetTable('user-sessions')

650

651

@app.agent()

652

async def track_sessions(events):

653

async for event in events:

654

user_id = event['user_id']

655

session_id = event['session_id']

656

action = event['action']

657

658

if action == 'login':

659

user_sessions.add(user_id, session_id)

660

elif action == 'logout':

661

user_sessions.discard(user_id, session_id)

662

663

# Check active sessions

664

@app.timer(interval=60.0)

665

async def monitor_sessions():

666

for user_id in user_sessions.keys():

667

sessions = user_sessions[user_id]

668

if len(sessions) > 5:

669

print(f"User {user_id} has {len(sessions)} active sessions")

670

```

671

672

## Type Interfaces

673

674

```python { .api }

675

from typing import Protocol, Iterator, Any, Optional, Callable, Dict

676

677

class TableT(Protocol):

678

"""Type interface for Table."""

679

680

name: str

681

key_type: Optional[type]

682

value_type: Optional[type]

683

684

def __getitem__(self, key: Any) -> Any: ...

685

def __setitem__(self, key: Any, value: Any) -> None: ...

686

def __delitem__(self, key: Any) -> None: ...

687

def __contains__(self, key: Any) -> bool: ...

688

689

def get(self, key: Any, default: Any = None) -> Any: ...

690

def items(self) -> Iterator: ...

691

def keys(self) -> Iterator: ...

692

def values(self) -> Iterator: ...

693

694

class ModelT(Protocol):

695

"""Type interface for Model."""

696

697

def dumps(self, *, serializer: Optional[str] = None) -> bytes: ...

698

699

@classmethod

700

def loads(cls, s: bytes, **kwargs) -> 'ModelT': ...

701

702

def asdict(self) -> Dict[str, Any]: ...

703

def derive(self, **fields) -> 'ModelT': ...

704

```