0
# Core Application Framework
1
2
The foundational components for creating and managing Faust applications. The App class serves as the central orchestrator, providing decorators and methods for defining stream processing logic, data storage, web endpoints, and CLI commands.
3
4
## Capabilities
5
6
### Application Class
7
8
The main Faust application class that coordinates all components including agents, topics, tables, services, and web endpoints. Acts as the entry point for building distributed stream processing applications.
9
10
```python { .api }
11
class App:
12
def __init__(
13
self,
14
id: str,
15
*,
16
broker: str = None,
17
autodiscover: bool = True,
18
origin: str = None,
19
canonical_url: str = None,
20
broker_consumer: str = None,
21
broker_producer: str = None,
22
cache: str = None,
23
web: str = None,
24
web_enabled: bool = True,
25
web_transport: str = None,
26
web_cors_options: dict = None,
27
logging_config: dict = None,
28
loghandlers: list = None,
29
datadir: str = None,
30
tabledir: str = None,
31
debug: bool = None,
32
quiet: bool = None,
33
no_color: bool = None,
34
blocking_timeout: float = 10.0,
35
broker_heartbeat: float = 0.0,
36
broker_commit_interval: float = 2.8,
37
broker_commit_livelock_soft_timeout: float = 300.0,
38
broker_commit_livelock_hard_timeout: float = 600.0,
39
broker_session_timeout: float = None,
40
broker_request_timeout: float = None,
41
broker_retry_backoff_type: str = 'exponential',
42
broker_retry_max_delay: float = 32.0,
43
broker_retry_delay: float = 0.1,
44
broker_api_version: str = 'auto',
45
topic_replication_factor: int = None,
46
topic_partitions: int = None,
47
topic_allow_declare: bool = True,
48
topic_disable_leader: bool = False,
49
id_format: str = '{id}-{self.version}',
50
stream_buffer_maxsize: int = 4096,
51
stream_wait_empty: bool = True,
52
stream_ack_cancelled_tasks: bool = True,
53
stream_ack_exceptions: bool = True,
54
stream_publish_on_commit: bool = False,
55
stream_recovery_delay: float = 10.0,
56
producer_linger: float = 0.0,
57
producer_max_batch_size: int = 16384,
58
producer_acks: int = -1,
59
producer_max_request_size: int = 1000000,
60
producer_compression_type: str = None,
61
producer_partitioner: str = None,
62
producer_request_timeout: float = None,
63
producer_api_version: str = None,
64
consumer_max_fetch_size: int = 4194304,
65
consumer_auto_offset_reset: str = 'earliest',
66
consumer_connections_max_idle: float = None,
67
consumer_request_timeout: float = None,
68
consumer_api_version: str = None,
69
consumer_session_timeout: float = None,
70
consumer_heartbeat_interval: float = None,
71
consumer_max_poll_records: int = None,
72
consumer_max_poll_interval: float = None,
73
consumer_rebalance_timeout: float = None,
74
consumer_group_instance_id: str = None,
75
web_bind: str = 'localhost',
76
web_port: int = 6066,
77
web_host: str = None,
78
web_in_thread: bool = False,
79
worker_redirect_stdouts: bool = None,
80
worker_redirect_stdouts_level: int = None,
81
reply_to: str = None,
82
reply_to_prefix: str = None,
83
reply_expires: float = 120.0,
84
reply_create_topic: bool = False,
85
ssl_context: object = None,
86
store_check_exists: bool = True,
87
table_cleanup_interval: float = 30.0,
88
table_key_index_size: int = 1000,
89
table_standby_replicas: int = 1,
90
timezone: str = None,
91
**kwargs
92
):
93
"""
94
Create a new Faust application.
95
96
Args:
97
id: Unique application identifier
98
broker: Kafka broker URL (e.g., 'kafka://localhost:9092')
99
autodiscover: Enable automatic discovery of agents and tasks
100
datadir: Directory for storing application data
101
web_enabled: Enable web server for HTTP endpoints
102
web_port: Port for web server
103
topic_partitions: Default number of partitions for new topics
104
**kwargs: Additional configuration options
105
"""
106
```
107
108
Usage Example:
109
110
```python
111
import faust
112
113
# Basic application
114
app = faust.App('my-app', broker='kafka://localhost:9092')
115
116
# Application with custom configuration
117
app = faust.App(
118
'my-app',
119
broker='kafka://localhost:9092',
120
web_port=8080,
121
topic_partitions=8,
122
datadir='/var/faust-data',
123
logging_config={'level': 'INFO'}
124
)
125
```
126
127
### Agent Decorator
128
129
Decorator for creating stream processing agents that consume from channels or topics. Agents are async functions that process streams of data with automatic scaling and fault tolerance.
130
131
```python { .api }
132
def agent(
133
self,
134
channel=None,
135
*,
136
name: str = None,
137
concurrency: int = 1,
138
sink: list = None,
139
on_error: callable = None,
140
supervisor_strategy: str = None,
141
help: str = None,
142
**kwargs
143
):
144
"""
145
Decorator to define a stream processing agent.
146
147
Args:
148
channel: Channel or topic to consume from
149
name: Agent name (defaults to function name)
150
concurrency: Number of concurrent instances
151
sink: List of channels to forward results to
152
on_error: Error handler function
153
supervisor_strategy: Strategy for handling agent failures
154
help: Help text for CLI
155
**kwargs: Additional agent options
156
157
Returns:
158
Agent decorator function
159
"""
160
```
161
162
Usage Example:
163
164
```python
165
# Basic agent
166
@app.agent(app.topic('orders'))
167
async def process_orders(orders):
168
async for order in orders:
169
print(f'Processing order: {order}')
170
171
# Agent with concurrency and error handling
172
@app.agent(
173
app.topic('payments'),
174
concurrency=5,
175
on_error=lambda agent, exc: print(f'Error: {exc}')
176
)
177
async def process_payments(payments):
178
async for payment in payments:
179
# Process payment
180
await process_payment(payment)
181
```
182
183
### Topic Definition
184
185
Method for defining Kafka topics with type safety, serialization, and partitioning configuration.
186
187
```python { .api }
188
def topic(
189
self,
190
topic: str,
191
*,
192
key_type: type = None,
193
value_type: type = None,
194
key_serializer: str = None,
195
value_serializer: str = None,
196
partitions: int = None,
197
retention: float = None,
198
compacting: bool = None,
199
deleting: bool = None,
200
replicas: int = None,
201
acks: bool = True,
202
delivery_guarantee: str = 'at_least_once',
203
maxsize: int = None,
204
root: str = None,
205
config: dict = None,
206
**kwargs
207
):
208
"""
209
Define a Kafka topic for the application.
210
211
Args:
212
topic: Topic name
213
key_type: Type for message keys
214
value_type: Type for message values
215
key_serializer: Serializer for keys ('json', 'raw', etc.)
216
value_serializer: Serializer for values ('json', 'pickle', etc.)
217
partitions: Number of partitions
218
retention: Message retention time in seconds
219
compacting: Enable log compaction
220
replicas: Replication factor
221
acks: Require broker acknowledgment
222
delivery_guarantee: 'at_least_once', 'at_most_once', 'exactly_once'
223
config: Additional Kafka topic configuration
224
225
Returns:
226
Topic object
227
"""
228
```
229
230
Usage Example:
231
232
```python
233
# Basic topic
234
orders_topic = app.topic('orders', value_type=str)
235
236
# Typed topic with custom serialization
237
from faust import Record
238
239
class Order(Record):
240
id: int
241
amount: float
242
customer: str
243
244
orders_topic = app.topic(
245
'orders',
246
key_type=int,
247
value_type=Order,
248
partitions=16,
249
retention=86400.0 # 24 hours
250
)
251
```
252
253
### Table Creation
254
255
Method for creating distributed key-value tables with changelog-based replication and windowing support.
256
257
```python { .api }
258
def table(
259
self,
260
name: str,
261
*,
262
default: callable = None,
263
window: object = None,
264
partitions: int = None,
265
help: str = None,
266
**kwargs
267
):
268
"""
269
Create a distributed table for stateful processing.
270
271
Args:
272
name: Table name
273
default: Default value factory function
274
window: Window specification for windowed tables
275
partitions: Number of partitions
276
help: Help text for CLI
277
**kwargs: Additional table options
278
279
Returns:
280
Table object
281
"""
282
```
283
284
Usage Example:
285
286
```python
287
# Basic table
288
word_counts = app.Table('word-counts', default=int)
289
290
# Table with custom default
291
user_profiles = app.Table('user-profiles', default=dict)
292
293
# Windowed table for time-based aggregation
294
from faust import TumblingWindow
295
296
windowed_counts = app.Table(
297
'hourly-counts',
298
default=int,
299
window=TumblingWindow(3600.0) # 1 hour windows
300
)
301
```
302
303
### Timer Decorator
304
305
Decorator for creating periodic background tasks that execute at regular intervals.
306
307
```python { .api }
308
def timer(
309
self,
310
seconds: float,
311
*,
312
on_error: callable = None,
313
**kwargs
314
):
315
"""
316
Decorator for periodic timer tasks.
317
318
Args:
319
seconds: Interval between executions in seconds
320
on_error: Error handler function
321
**kwargs: Additional timer options
322
323
Returns:
324
Timer decorator function
325
"""
326
```
327
328
Usage Example:
329
330
```python
331
# Basic timer
332
@app.timer(interval=30.0)
333
async def cleanup_task():
334
print("Running cleanup...")
335
# Cleanup logic here
336
337
# Timer with error handling
338
@app.timer(
339
interval=60.0,
340
on_error=lambda timer, exc: print(f'Timer error: {exc}')
341
)
342
async def health_check():
343
# Health check logic
344
await check_system_health()
345
```
346
347
### Cron Job Decorator
348
349
Decorator for creating cron-style scheduled tasks using cron expressions.
350
351
```python { .api }
352
def crontab(
353
self,
354
cron_format: str,
355
*,
356
timezone: str = None,
357
on_error: callable = None,
358
**kwargs
359
):
360
"""
361
Decorator for cron-scheduled tasks.
362
363
Args:
364
cron_format: Cron expression (e.g., '0 */2 * * *')
365
timezone: Timezone for scheduling
366
on_error: Error handler function
367
**kwargs: Additional cron options
368
369
Returns:
370
Cron decorator function
371
"""
372
```
373
374
Usage Example:
375
376
```python
377
# Daily task at midnight
378
@app.crontab('0 0 * * *')
379
async def daily_report():
380
print("Generating daily report...")
381
382
# Every 15 minutes with timezone
383
@app.crontab('*/15 * * * *', timezone='UTC')
384
async def sync_data():
385
await synchronize_external_data()
386
```
387
388
### Web Page Decorator
389
390
Decorator for creating HTTP endpoints and web pages integrated with the Faust web server.
391
392
```python { .api }
393
def page(
394
self,
395
path: str,
396
*,
397
base: object = None,
398
cors_options: dict = None,
399
name: str = None
400
):
401
"""
402
Decorator for HTTP endpoints.
403
404
Args:
405
path: URL path pattern
406
base: Base view class
407
cors_options: CORS configuration
408
name: Endpoint name
409
410
Returns:
411
Web page decorator function
412
"""
413
```
414
415
Usage Example:
416
417
```python
418
# Basic web endpoint
419
@app.page('/health')
420
async def health_check(web, request):
421
return web.json({'status': 'healthy'})
422
423
# Endpoint with parameters
424
@app.page('/orders/{order_id}')
425
async def get_order(web, request, order_id):
426
order = await get_order_by_id(order_id)
427
return web.json(order.asdict())
428
```
429
430
### CLI Command Decorator
431
432
Decorator for creating application-specific CLI commands integrated with the Faust command-line interface.
433
434
```python { .api }
435
def command(
436
self,
437
*options,
438
base: object = None,
439
**kwargs
440
):
441
"""
442
Decorator for CLI commands.
443
444
Args:
445
*options: Click command options
446
base: Base command class
447
**kwargs: Additional command options
448
449
Returns:
450
Command decorator function
451
"""
452
```
453
454
Usage Example:
455
456
```python
457
import click
458
459
# Basic command
460
@app.command()
461
async def hello():
462
print("Hello from Faust!")
463
464
# Command with arguments
465
@app.command(
466
click.argument('name'),
467
click.option('--count', default=1, help='Number of greetings')
468
)
469
async def greet(name, count):
470
for i in range(count):
471
print(f"Hello {name}!")
472
```
473
474
### Application Lifecycle
475
476
Methods for managing the application lifecycle including startup, shutdown, and main execution.
477
478
```python { .api }
479
def main(self):
480
"""
481
Main entry point for running the application.
482
Handles command-line arguments and starts the application.
483
"""
484
485
async def start(self):
486
"""
487
Start the application and all its services.
488
"""
489
490
async def stop(self):
491
"""
492
Stop the application and clean up resources.
493
"""
494
495
def loop(self):
496
"""
497
Get the asyncio event loop for the application.
498
499
Returns:
500
Event loop instance
501
"""
502
```
503
504
Usage Example:
505
506
```python
507
# Standard application entry point
508
if __name__ == '__main__':
509
app.main()
510
511
# Programmatic control
512
import asyncio
513
514
async def run_app():
515
await app.start()
516
# Application running...
517
await app.stop()
518
519
asyncio.run(run_app())
520
```
521
522
## Type Interfaces
523
524
```python { .api }
525
from typing import Protocol
526
527
class AppT(Protocol):
528
"""Type interface for Faust applications."""
529
id: str
530
broker: str
531
532
def agent(self, channel=None, **kwargs): ...
533
def topic(self, topic: str, **kwargs): ...
534
def table(self, name: str, **kwargs): ...
535
def main(self): ...
536
```