or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcli-framework.mdcore-application.mddata-management.mdindex.mdmonitoring.mdserialization.mdstream-processing.mdtopics-channels.mdwindowing.mdworker-management.md

serialization.mddocs/

0

# Serialization

1

2

Data serialization and schema management for type-safe message handling in Faust applications. Provides codecs for different data formats, schema definitions for structured data, and flexible serialization pipelines with support for custom serializers and data transformation.

3

4

## Capabilities

5

6

### Schema Management

7

8

Schema definitions for structured data serialization and deserialization. Schemas provide type-safe message handling with automatic key/value serialization, custom serializer selection, and metadata preservation.

9

10

```python { .api }

11

class Schema:

12

def __init__(

13

self,

14

*,

15

key_type: type = None,

16

value_type: type = None,

17

key_serializer: str = None,

18

value_serializer: str = None,

19

allow_empty: bool = False,

20

**kwargs

21

):

22

"""

23

Create a message schema definition.

24

25

Args:

26

key_type: Type for message keys

27

value_type: Type for message values

28

key_serializer: Serializer name for keys

29

value_serializer: Serializer name for values

30

allow_empty: Allow None/empty values

31

"""

32

33

def loads_key(

34

self,

35

app: App,

36

message: bytes,

37

*,

38

loads: callable = None,

39

serializer: str = None

40

) -> any:

41

"""

42

Deserialize message key.

43

44

Args:

45

app: Faust application instance

46

message: Raw message data

47

loads: Custom deserialization function

48

serializer: Override serializer

49

50

Returns:

51

Deserialized key object

52

"""

53

54

def loads_value(

55

self,

56

app: App,

57

message: bytes,

58

*,

59

loads: callable = None,

60

serializer: str = None

61

) -> any:

62

"""

63

Deserialize message value.

64

65

Args:

66

app: Faust application instance

67

message: Raw message data

68

loads: Custom deserialization function

69

serializer: Override serializer

70

71

Returns:

72

Deserialized value object

73

"""

74

75

def dumps_key(

76

self,

77

app: App,

78

key: any,

79

*,

80

serializer: str = None

81

) -> bytes:

82

"""

83

Serialize message key.

84

85

Args:

86

app: Faust application instance

87

key: Key object to serialize

88

serializer: Override serializer

89

90

Returns:

91

Serialized key bytes

92

"""

93

94

def dumps_value(

95

self,

96

app: App,

97

value: any,

98

*,

99

serializer: str = None

100

) -> bytes:

101

"""

102

Serialize message value.

103

104

Args:

105

app: Faust application instance

106

value: Value object to serialize

107

serializer: Override serializer

108

109

Returns:

110

Serialized value bytes

111

"""

112

113

@property

114

def key_type(self) -> type:

115

"""Type for message keys."""

116

117

@property

118

def value_type(self) -> type:

119

"""Type for message values."""

120

121

@property

122

def key_serializer(self) -> str:

123

"""Serializer name for keys."""

124

125

@property

126

def value_serializer(self) -> str:

127

"""Serializer name for values."""

128

```

129

130

### Codec Interface

131

132

Low-level serialization codec interface for implementing custom data formats and transformation pipelines. Codecs handle the actual byte-level encoding and decoding operations.

133

134

```python { .api }

135

class Codec:

136

def __init__(self, **kwargs):

137

"""

138

Create a serialization codec.

139

140

Args:

141

**kwargs: Codec-specific configuration

142

"""

143

144

def encode(self, obj: any) -> bytes:

145

"""

146

Encode object to bytes.

147

148

Args:

149

obj: Object to encode

150

151

Returns:

152

Encoded bytes

153

154

Raises:

155

SerializationError: If encoding fails

156

"""

157

158

def decode(self, data: bytes) -> any:

159

"""

160

Decode bytes to object.

161

162

Args:

163

data: Bytes to decode

164

165

Returns:

166

Decoded object

167

168

Raises:

169

SerializationError: If decoding fails

170

"""

171

172

@property

173

def mime_type(self) -> str:

174

"""MIME type for this codec."""

175

```

176

177

### Built-in Codecs

178

179

Pre-implemented codecs for common data formats including JSON, pickle, raw bytes, and structured formats with optimized performance and error handling.

180

181

```python { .api }

182

class JSONCodec(Codec):

183

def __init__(

184

self,

185

*,

186

ensure_ascii: bool = False,

187

sort_keys: bool = False,

188

separators: tuple = None,

189

**kwargs

190

):

191

"""

192

JSON serialization codec.

193

194

Args:

195

ensure_ascii: Escape non-ASCII characters

196

sort_keys: Sort dictionary keys

197

separators: Custom separators (item, key)

198

"""

199

200

def encode(self, obj: any) -> bytes:

201

"""Encode object as JSON bytes."""

202

203

def decode(self, data: bytes) -> any:

204

"""Decode JSON bytes to object."""

205

206

class PickleCodec(Codec):

207

def __init__(self, *, protocol: int = None, **kwargs):

208

"""

209

Python pickle serialization codec.

210

211

Args:

212

protocol: Pickle protocol version

213

"""

214

215

def encode(self, obj: any) -> bytes:

216

"""Encode object using pickle."""

217

218

def decode(self, data: bytes) -> any:

219

"""Decode pickle bytes to object."""

220

221

class RawCodec(Codec):

222

def encode(self, obj: bytes) -> bytes:

223

"""Pass-through for raw bytes."""

224

225

def decode(self, data: bytes) -> bytes:

226

"""Pass-through for raw bytes."""

227

228

class BinaryCodec(Codec):

229

def encode(self, obj: any) -> bytes:

230

"""Encode as binary representation."""

231

232

def decode(self, data: bytes) -> any:

233

"""Decode from binary representation."""

234

```

235

236

### Registry Management

237

238

Codec registry for managing available serializers and their configuration. Provides dynamic codec selection, registration of custom codecs, and serializer name resolution.

239

240

```python { .api }

241

class Registry:

242

def __init__(self):

243

"""Create codec registry with built-in codecs."""

244

245

def register(self, name: str, codec: Codec) -> None:

246

"""

247

Register a codec with given name.

248

249

Args:

250

name: Codec name for lookup

251

codec: Codec instance or class

252

"""

253

254

def get(self, name: str) -> Codec:

255

"""

256

Get codec by name.

257

258

Args:

259

name: Codec name

260

261

Returns:

262

Codec instance

263

264

Raises:

265

KeyError: If codec not found

266

"""

267

268

def list_codecs(self) -> list:

269

"""

270

List all registered codec names.

271

272

Returns:

273

List of codec names

274

"""

275

276

# Default codec registry

277

codecs = Registry()

278

279

def register_codec(name: str, codec: Codec) -> None:

280

"""

281

Register codec in default registry.

282

283

Args:

284

name: Codec name

285

codec: Codec instance or class

286

"""

287

288

def get_codec(name: str) -> Codec:

289

"""

290

Get codec from default registry.

291

292

Args:

293

name: Codec name

294

295

Returns:

296

Codec instance

297

"""

298

```

299

300

### Serializer Configuration

301

302

Configuration utilities for setting up serialization behavior at the application, topic, and message level with inheritance and override support.

303

304

```python { .api }

305

class SerializerSettings:

306

def __init__(

307

self,

308

*,

309

key: str = None,

310

value: str = None,

311

allow_empty: bool = True,

312

**kwargs

313

):

314

"""

315

Serializer configuration settings.

316

317

Args:

318

key: Default key serializer name

319

value: Default value serializer name

320

allow_empty: Allow empty/None values

321

"""

322

323

@property

324

def key_serializer(self) -> str:

325

"""Default key serializer."""

326

327

@property

328

def value_serializer(self) -> str:

329

"""Default value serializer."""

330

331

def configure_serializers(

332

app: App,

333

*,

334

key: str = None,

335

value: str = None,

336

**kwargs

337

) -> None:

338

"""

339

Configure default serializers for application.

340

341

Args:

342

app: Faust application

343

key: Default key serializer

344

value: Default value serializer

345

"""

346

```

347

348

### Custom Serializers

349

350

Framework for implementing custom serialization formats with proper error handling, type validation, and performance optimization.

351

352

```python { .api }

353

class CustomCodec(Codec):

354

def __init__(self, **config):

355

"""

356

Base class for custom codecs.

357

358

Args:

359

**config: Codec configuration

360

"""

361

super().__init__(**config)

362

363

def validate(self, obj: any) -> bool:

364

"""

365

Validate object before serialization.

366

367

Args:

368

obj: Object to validate

369

370

Returns:

371

True if valid for this codec

372

"""

373

374

def transform_encode(self, obj: any) -> any:

375

"""

376

Transform object before encoding.

377

378

Args:

379

obj: Object to transform

380

381

Returns:

382

Transformed object

383

"""

384

385

def transform_decode(self, obj: any) -> any:

386

"""

387

Transform object after decoding.

388

389

Args:

390

obj: Decoded object

391

392

Returns:

393

Transformed object

394

"""

395

396

class SerializationError(Exception):

397

"""Raised when serialization/deserialization fails."""

398

pass

399

400

class SchemaError(Exception):

401

"""Raised when schema validation fails."""

402

pass

403

```

404

405

## Usage Examples

406

407

### Basic Serialization

408

409

```python

410

import faust

411

from faust import JSONCodec, PickleCodec

412

413

app = faust.App('serialization-app', broker='kafka://localhost:9092')

414

415

# Topic with JSON serialization

416

json_topic = app.topic(

417

'json-events',

418

value_type=dict,

419

value_serializer='json'

420

)

421

422

# Topic with pickle serialization

423

pickle_topic = app.topic(

424

'pickle-data',

425

value_serializer='pickle'

426

)

427

428

@app.agent(json_topic)

429

async def handle_json_events(events):

430

async for event in events:

431

# Automatically deserialized from JSON

432

print(f"Event type: {event['type']}, data: {event['data']}")

433

434

# Send JSON data

435

await json_topic.send(value={

436

'type': 'user_login',

437

'data': {'user_id': 123, 'timestamp': '2024-01-01T00:00:00Z'}

438

})

439

```

440

441

### Custom Schema Definition

442

443

```python

444

from faust import Schema

445

from datetime import datetime

446

447

class EventSchema(Schema):

448

def __init__(self):

449

super().__init__(

450

key_type=str,

451

value_type=dict,

452

key_serializer='raw',

453

value_serializer='json'

454

)

455

456

# Topic with custom schema

457

events_topic = app.topic(

458

'events',

459

schema=EventSchema()

460

)

461

462

@app.agent(events_topic)

463

async def process_events(events):

464

async for event in events:

465

# Keys are strings, values are dicts

466

key = event.key # Already deserialized

467

data = event.value # Already deserialized

468

print(f"Processing {key}: {data}")

469

```

470

471

### Custom Codec Implementation

472

473

```python

474

import json

475

import gzip

476

from faust import Codec

477

478

class CompressedJSONCodec(Codec):

479

"""JSON codec with gzip compression."""

480

481

def encode(self, obj):

482

json_bytes = json.dumps(obj).encode('utf-8')

483

return gzip.compress(json_bytes)

484

485

def decode(self, data):

486

json_bytes = gzip.decompress(data)

487

return json.loads(json_bytes.decode('utf-8'))

488

489

@property

490

def mime_type(self):

491

return 'application/json+gzip'

492

493

# Register custom codec

494

faust.codecs.register('compressed_json', CompressedJSONCodec())

495

496

# Use custom codec

497

compressed_topic = app.topic(

498

'compressed-data',

499

value_serializer='compressed_json'

500

)

501

```

502

503

### Model-based Serialization

504

505

```python

506

class User(faust.Record, serializer='json'):

507

id: int

508

name: str

509

email: str

510

created_at: datetime

511

512

class UserEvent(faust.Record, serializer='json'):

513

user: User

514

event_type: str

515

timestamp: datetime

516

517

# Topics with model types

518

users_topic = app.topic('users', value_type=User)

519

events_topic = app.topic('user-events', value_type=UserEvent)

520

521

@app.agent(users_topic)

522

async def handle_users(users):

523

async for user in users:

524

# Automatic deserialization to User model

525

print(f"User {user.id}: {user.name} ({user.email})")

526

527

# Create related event

528

event = UserEvent(

529

user=user,

530

event_type='created',

531

timestamp=datetime.utcnow()

532

)

533

await events_topic.send(value=event)

534

```

535

536

### Advanced Serialization Configuration

537

538

```python

539

# Configure app-wide serializer defaults

540

app = faust.App(

541

'my-app',

542

broker='kafka://localhost:9092',

543

key_serializer='raw',

544

value_serializer='json'

545

)

546

547

# Topic with custom serializers

548

special_topic = app.topic(

549

'special-data',

550

key_type=str,

551

value_type=bytes,

552

key_serializer='json', # Override app default

553

value_serializer='raw' # Override app default

554

)

555

556

# Message-level serializer override

557

await special_topic.send(

558

key={'user_id': 123},

559

value=b'raw binary data',

560

key_serializer='pickle', # Override topic default

561

value_serializer='binary' # Override topic default

562

)

563

```

564

565

### Error Handling

566

567

```python

568

from faust import SerializationError

569

570

@app.agent()

571

async def resilient_processor(stream):

572

async for event in stream:

573

try:

574

# Process the event

575

data = event.value

576

process_data(data)

577

except SerializationError as e:

578

# Handle serialization errors

579

print(f"Serialization error: {e}")

580

# Log to dead letter queue or skip

581

await dead_letter_topic.send(

582

key=event.key,

583

value={'error': str(e), 'raw_data': event.message.value}

584

)

585

```

586

587

## Type Interfaces

588

589

```python { .api }

590

from typing import Protocol, Any, Optional, Union, Dict

591

592

class CodecT(Protocol):

593

"""Type interface for Codec."""

594

595

mime_type: str

596

597

def encode(self, obj: Any) -> bytes: ...

598

def decode(self, data: bytes) -> Any: ...

599

600

class SchemaT(Protocol):

601

"""Type interface for Schema."""

602

603

key_type: Optional[type]

604

value_type: Optional[type]

605

key_serializer: Optional[str]

606

value_serializer: Optional[str]

607

608

def loads_key(self, app: Any, message: bytes, **kwargs) -> Any: ...

609

def loads_value(self, app: Any, message: bytes, **kwargs) -> Any: ...

610

def dumps_key(self, app: Any, key: Any, **kwargs) -> bytes: ...

611

def dumps_value(self, app: Any, value: Any, **kwargs) -> bytes: ...

612

613

class RegistryT(Protocol):

614

"""Type interface for codec Registry."""

615

616

def register(self, name: str, codec: CodecT) -> None: ...

617

def get(self, name: str) -> CodecT: ...

618

def list_codecs(self) -> list: ...

619

```