or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcli-framework.mdcore-application.mddata-management.mdindex.mdmonitoring.mdserialization.mdstream-processing.mdtopics-channels.mdwindowing.mdworker-management.md

core-application.mddocs/

0

# Core Application Framework

1

2

The foundational components for creating and managing Faust applications. The App class serves as the central orchestrator, providing decorators and methods for defining stream processing logic, data storage, web endpoints, and CLI commands.

3

4

## Capabilities

5

6

### Application Class

7

8

The main Faust application class that coordinates all components including agents, topics, tables, services, and web endpoints. Acts as the entry point for building distributed stream processing applications.

9

10

```python { .api }

11

class App:

12

def __init__(

13

self,

14

id: str,

15

*,

16

broker: str = None,

17

autodiscover: bool = True,

18

origin: str = None,

19

canonical_url: str = None,

20

broker_consumer: str = None,

21

broker_producer: str = None,

22

cache: str = None,

23

web: str = None,

24

web_enabled: bool = True,

25

web_transport: str = None,

26

web_cors_options: dict = None,

27

logging_config: dict = None,

28

loghandlers: list = None,

29

datadir: str = None,

30

tabledir: str = None,

31

debug: bool = None,

32

quiet: bool = None,

33

no_color: bool = None,

34

blocking_timeout: float = 10.0,

35

broker_heartbeat: float = 0.0,

36

broker_commit_interval: float = 2.8,

37

broker_commit_livelock_soft_timeout: float = 300.0,

38

broker_commit_livelock_hard_timeout: float = 600.0,

39

broker_session_timeout: float = None,

40

broker_request_timeout: float = None,

41

broker_retry_backoff_type: str = 'exponential',

42

broker_retry_max_delay: float = 32.0,

43

broker_retry_delay: float = 0.1,

44

broker_api_version: str = 'auto',

45

topic_replication_factor: int = None,

46

topic_partitions: int = None,

47

topic_allow_declare: bool = True,

48

topic_disable_leader: bool = False,

49

id_format: str = '{id}-{self.version}',

50

stream_buffer_maxsize: int = 4096,

51

stream_wait_empty: bool = True,

52

stream_ack_cancelled_tasks: bool = True,

53

stream_ack_exceptions: bool = True,

54

stream_publish_on_commit: bool = False,

55

stream_recovery_delay: float = 10.0,

56

producer_linger: float = 0.0,

57

producer_max_batch_size: int = 16384,

58

producer_acks: int = -1,

59

producer_max_request_size: int = 1000000,

60

producer_compression_type: str = None,

61

producer_partitioner: str = None,

62

producer_request_timeout: float = None,

63

producer_api_version: str = None,

64

consumer_max_fetch_size: int = 4194304,

65

consumer_auto_offset_reset: str = 'earliest',

66

consumer_connections_max_idle: float = None,

67

consumer_request_timeout: float = None,

68

consumer_api_version: str = None,

69

consumer_session_timeout: float = None,

70

consumer_heartbeat_interval: float = None,

71

consumer_max_poll_records: int = None,

72

consumer_max_poll_interval: float = None,

73

consumer_rebalance_timeout: float = None,

74

consumer_group_instance_id: str = None,

75

web_bind: str = 'localhost',

76

web_port: int = 6066,

77

web_host: str = None,

78

web_in_thread: bool = False,

79

worker_redirect_stdouts: bool = None,

80

worker_redirect_stdouts_level: int = None,

81

reply_to: str = None,

82

reply_to_prefix: str = None,

83

reply_expires: float = 120.0,

84

reply_create_topic: bool = False,

85

ssl_context: object = None,

86

store_check_exists: bool = True,

87

table_cleanup_interval: float = 30.0,

88

table_key_index_size: int = 1000,

89

table_standby_replicas: int = 1,

90

timezone: str = None,

91

**kwargs

92

):

93

"""

94

Create a new Faust application.

95

96

Args:

97

id: Unique application identifier

98

broker: Kafka broker URL (e.g., 'kafka://localhost:9092')

99

autodiscover: Enable automatic discovery of agents and tasks

100

datadir: Directory for storing application data

101

web_enabled: Enable web server for HTTP endpoints

102

web_port: Port for web server

103

topic_partitions: Default number of partitions for new topics

104

**kwargs: Additional configuration options

105

"""

106

```

107

108

Usage Example:

109

110

```python

111

import faust

112

113

# Basic application

114

app = faust.App('my-app', broker='kafka://localhost:9092')

115

116

# Application with custom configuration

117

app = faust.App(

118

'my-app',

119

broker='kafka://localhost:9092',

120

web_port=8080,

121

topic_partitions=8,

122

datadir='/var/faust-data',

123

logging_config={'level': 'INFO'}

124

)

125

```

126

127

### Agent Decorator

128

129

Decorator for creating stream processing agents that consume from channels or topics. Agents are async functions that process streams of data with automatic scaling and fault tolerance.

130

131

```python { .api }

132

def agent(

133

self,

134

channel=None,

135

*,

136

name: str = None,

137

concurrency: int = 1,

138

sink: list = None,

139

on_error: callable = None,

140

supervisor_strategy: str = None,

141

help: str = None,

142

**kwargs

143

):

144

"""

145

Decorator to define a stream processing agent.

146

147

Args:

148

channel: Channel or topic to consume from

149

name: Agent name (defaults to function name)

150

concurrency: Number of concurrent instances

151

sink: List of channels to forward results to

152

on_error: Error handler function

153

supervisor_strategy: Strategy for handling agent failures

154

help: Help text for CLI

155

**kwargs: Additional agent options

156

157

Returns:

158

Agent decorator function

159

"""

160

```

161

162

Usage Example:

163

164

```python

165

# Basic agent

166

@app.agent(app.topic('orders'))

167

async def process_orders(orders):

168

async for order in orders:

169

print(f'Processing order: {order}')

170

171

# Agent with concurrency and error handling

172

@app.agent(

173

app.topic('payments'),

174

concurrency=5,

175

on_error=lambda agent, exc: print(f'Error: {exc}')

176

)

177

async def process_payments(payments):

178

async for payment in payments:

179

# Process payment

180

await process_payment(payment)

181

```

182

183

### Topic Definition

184

185

Method for defining Kafka topics with type safety, serialization, and partitioning configuration.

186

187

```python { .api }

188

def topic(

189

self,

190

topic: str,

191

*,

192

key_type: type = None,

193

value_type: type = None,

194

key_serializer: str = None,

195

value_serializer: str = None,

196

partitions: int = None,

197

retention: float = None,

198

compacting: bool = None,

199

deleting: bool = None,

200

replicas: int = None,

201

acks: bool = True,

202

delivery_guarantee: str = 'at_least_once',

203

maxsize: int = None,

204

root: str = None,

205

config: dict = None,

206

**kwargs

207

):

208

"""

209

Define a Kafka topic for the application.

210

211

Args:

212

topic: Topic name

213

key_type: Type for message keys

214

value_type: Type for message values

215

key_serializer: Serializer for keys ('json', 'raw', etc.)

216

value_serializer: Serializer for values ('json', 'pickle', etc.)

217

partitions: Number of partitions

218

retention: Message retention time in seconds

219

compacting: Enable log compaction

220

replicas: Replication factor

221

acks: Require broker acknowledgment

222

delivery_guarantee: 'at_least_once', 'at_most_once', 'exactly_once'

223

config: Additional Kafka topic configuration

224

225

Returns:

226

Topic object

227

"""

228

```

229

230

Usage Example:

231

232

```python

233

# Basic topic

234

orders_topic = app.topic('orders', value_type=str)

235

236

# Typed topic with custom serialization

237

from faust import Record

238

239

class Order(Record):

240

id: int

241

amount: float

242

customer: str

243

244

orders_topic = app.topic(

245

'orders',

246

key_type=int,

247

value_type=Order,

248

partitions=16,

249

retention=86400.0 # 24 hours

250

)

251

```

252

253

### Table Creation

254

255

Method for creating distributed key-value tables with changelog-based replication and windowing support.

256

257

```python { .api }

258

def table(

259

self,

260

name: str,

261

*,

262

default: callable = None,

263

window: object = None,

264

partitions: int = None,

265

help: str = None,

266

**kwargs

267

):

268

"""

269

Create a distributed table for stateful processing.

270

271

Args:

272

name: Table name

273

default: Default value factory function

274

window: Window specification for windowed tables

275

partitions: Number of partitions

276

help: Help text for CLI

277

**kwargs: Additional table options

278

279

Returns:

280

Table object

281

"""

282

```

283

284

Usage Example:

285

286

```python

287

# Basic table

288

word_counts = app.Table('word-counts', default=int)

289

290

# Table with custom default

291

user_profiles = app.Table('user-profiles', default=dict)

292

293

# Windowed table for time-based aggregation

294

from faust import TumblingWindow

295

296

windowed_counts = app.Table(

297

'hourly-counts',

298

default=int,

299

window=TumblingWindow(3600.0) # 1 hour windows

300

)

301

```

302

303

### Timer Decorator

304

305

Decorator for creating periodic background tasks that execute at regular intervals.

306

307

```python { .api }

308

def timer(

309

self,

310

seconds: float,

311

*,

312

on_error: callable = None,

313

**kwargs

314

):

315

"""

316

Decorator for periodic timer tasks.

317

318

Args:

319

seconds: Interval between executions in seconds

320

on_error: Error handler function

321

**kwargs: Additional timer options

322

323

Returns:

324

Timer decorator function

325

"""

326

```

327

328

Usage Example:

329

330

```python

331

# Basic timer

332

@app.timer(interval=30.0)

333

async def cleanup_task():

334

print("Running cleanup...")

335

# Cleanup logic here

336

337

# Timer with error handling

338

@app.timer(

339

interval=60.0,

340

on_error=lambda timer, exc: print(f'Timer error: {exc}')

341

)

342

async def health_check():

343

# Health check logic

344

await check_system_health()

345

```

346

347

### Cron Job Decorator

348

349

Decorator for creating cron-style scheduled tasks using cron expressions.

350

351

```python { .api }

352

def crontab(

353

self,

354

cron_format: str,

355

*,

356

timezone: str = None,

357

on_error: callable = None,

358

**kwargs

359

):

360

"""

361

Decorator for cron-scheduled tasks.

362

363

Args:

364

cron_format: Cron expression (e.g., '0 */2 * * *')

365

timezone: Timezone for scheduling

366

on_error: Error handler function

367

**kwargs: Additional cron options

368

369

Returns:

370

Cron decorator function

371

"""

372

```

373

374

Usage Example:

375

376

```python

377

# Daily task at midnight

378

@app.crontab('0 0 * * *')

379

async def daily_report():

380

print("Generating daily report...")

381

382

# Every 15 minutes with timezone

383

@app.crontab('*/15 * * * *', timezone='UTC')

384

async def sync_data():

385

await synchronize_external_data()

386

```

387

388

### Web Page Decorator

389

390

Decorator for creating HTTP endpoints and web pages integrated with the Faust web server.

391

392

```python { .api }

393

def page(

394

self,

395

path: str,

396

*,

397

base: object = None,

398

cors_options: dict = None,

399

name: str = None

400

):

401

"""

402

Decorator for HTTP endpoints.

403

404

Args:

405

path: URL path pattern

406

base: Base view class

407

cors_options: CORS configuration

408

name: Endpoint name

409

410

Returns:

411

Web page decorator function

412

"""

413

```

414

415

Usage Example:

416

417

```python

418

# Basic web endpoint

419

@app.page('/health')

420

async def health_check(web, request):

421

return web.json({'status': 'healthy'})

422

423

# Endpoint with parameters

424

@app.page('/orders/{order_id}')

425

async def get_order(web, request, order_id):

426

order = await get_order_by_id(order_id)

427

return web.json(order.asdict())

428

```

429

430

### CLI Command Decorator

431

432

Decorator for creating application-specific CLI commands integrated with the Faust command-line interface.

433

434

```python { .api }

435

def command(

436

self,

437

*options,

438

base: object = None,

439

**kwargs

440

):

441

"""

442

Decorator for CLI commands.

443

444

Args:

445

*options: Click command options

446

base: Base command class

447

**kwargs: Additional command options

448

449

Returns:

450

Command decorator function

451

"""

452

```

453

454

Usage Example:

455

456

```python

457

import click

458

459

# Basic command

460

@app.command()

461

async def hello():

462

print("Hello from Faust!")

463

464

# Command with arguments

465

@app.command(

466

click.argument('name'),

467

click.option('--count', default=1, help='Number of greetings')

468

)

469

async def greet(name, count):

470

for i in range(count):

471

print(f"Hello {name}!")

472

```

473

474

### Application Lifecycle

475

476

Methods for managing the application lifecycle including startup, shutdown, and main execution.

477

478

```python { .api }

479

def main(self):

480

"""

481

Main entry point for running the application.

482

Handles command-line arguments and starts the application.

483

"""

484

485

async def start(self):

486

"""

487

Start the application and all its services.

488

"""

489

490

async def stop(self):

491

"""

492

Stop the application and clean up resources.

493

"""

494

495

def loop(self):

496

"""

497

Get the asyncio event loop for the application.

498

499

Returns:

500

Event loop instance

501

"""

502

```

503

504

Usage Example:

505

506

```python

507

# Standard application entry point

508

if __name__ == '__main__':

509

app.main()

510

511

# Programmatic control

512

import asyncio

513

514

async def run_app():

515

await app.start()

516

# Application running...

517

await app.stop()

518

519

asyncio.run(run_app())

520

```

521

522

## Type Interfaces

523

524

```python { .api }

525

from typing import Protocol

526

527

class AppT(Protocol):

528

"""Type interface for Faust applications."""

529

id: str

530

broker: str

531

532

def agent(self, channel=None, **kwargs): ...

533

def topic(self, topic: str, **kwargs): ...

534

def table(self, name: str, **kwargs): ...

535

def main(self): ...

536

```