0
# Faust
1
2
A Python stream processing library that ports the ideas from Kafka Streams to Python. Faust enables building distributed systems and real-time data pipelines that process streams of data with state management, automatic failover, and changelog-based replication.
3
4
## Package Information
5
6
- **Package Name**: faust
7
- **Language**: Python
8
- **Python Version**: >=3.6.0
9
- **Installation**: `pip install faust`
10
- **License**: BSD-3-Clause
11
12
## Core Imports
13
14
```python
15
import faust
16
```
17
18
Common imports for stream processing:
19
20
```python
21
from faust import App, Agent, Stream, Table, Record
22
```
23
24
## Basic Usage
25
26
```python
27
import faust
28
29
# Create a Faust application
30
app = faust.App('word-count-app', broker='kafka://localhost:9092')
31
32
# Define a data model
33
class WordCount(faust.Record):
34
word: str
35
count: int
36
37
# Create a topic
38
words_topic = app.topic('words', value_type=str)
39
word_counts_table = app.Table('word-counts', default=int)
40
41
# Define a stream processing agent
42
@app.agent(words_topic)
43
async def count_words(words):
44
async for word in words:
45
word_counts_table[word] += 1
46
47
# Define a timer to print current counts
48
@app.timer(interval=10.0)
49
async def print_counts():
50
for word, count in word_counts_table.items():
51
print(f'{word}: {count}')
52
53
# Run the application
54
if __name__ == '__main__':
55
app.main()
56
```
57
58
## Architecture
59
60
Faust follows a decorator-based architecture centered around the `App` class:
61
62
- **App**: Main application container that manages agents, topics, tables, and services
63
- **Agents**: Stream processing functions decorated with `@app.agent()` that consume from channels
64
- **Topics**: Named channels backed by Kafka topics for message distribution
65
- **Streams**: Async iterators over events in channels with transformation capabilities
66
- **Tables**: Distributed key-value stores with changelog-based replication for stateful processing
67
- **Events**: Message containers with key, value, headers, and acknowledgment capabilities
68
- **Models**: Structured data classes for type-safe serialization/deserialization
69
70
This design enables building scalable, fault-tolerant stream processing applications that integrate seamlessly with the Python ecosystem.
71
72
## Capabilities
73
74
### Core Application Framework
75
76
The foundational components for creating and managing Faust applications, including the main App class and decorators for defining agents, topics, tables, web endpoints, and CLI commands.
77
78
```python { .api }
79
class App:
80
def __init__(self, id: str, *, broker: str = None, **kwargs): ...
81
def agent(self, channel=None, *, name=None, concurrency=1, **kwargs): ...
82
def topic(self, topic: str, *, key_type=None, value_type=None, **kwargs): ...
83
def table(self, name: str, *, default=None, window=None, **kwargs): ...
84
def timer(self, seconds: float, *, on_error=None, **kwargs): ...
85
def crontab(self, cron_format: str, *, timezone=None, **kwargs): ...
86
def command(self, *options, base=None, **kwargs): ...
87
def page(self, path: str, *, cors_options=None, **kwargs): ...
88
def main(self): ...
89
```
90
91
[Core Application](./core-application.md)
92
93
### Stream Processing
94
95
Stream processing components including agents for consuming data streams, stream transformation operations, and event handling for building real-time data processing pipelines.
96
97
```python { .api }
98
class Agent:
99
async def send(self, value=None, *, key=None, partition=None): ...
100
async def ask(self, value=None, *, key=None, reply_to=None, **kwargs): ...
101
def cast(self, value=None, *, key=None, partition=None): ...
102
103
class Stream:
104
def filter(self, fun): ...
105
def map(self, fun): ...
106
def group_by(self, key, *, name=None): ...
107
def take(self, max_: int, *, within=None): ...
108
def rate_limit(self, rate: float, *, per=1.0): ...
109
def through(self, channel, **kwargs): ...
110
111
class Event:
112
def ack(self): ...
113
def reject(self): ...
114
async def send(self, channel, key=None, value=None, **kwargs): ...
115
async def forward(self, channel, *, key=None, value=None, **kwargs): ...
116
117
def current_event() -> Event: ...
118
```
119
120
[Stream Processing](./stream-processing.md)
121
122
### Topics and Channels
123
124
Topic and channel management for message distribution, including configuration, partitioning, serialization, and integration with Kafka infrastructure.
125
126
```python { .api }
127
class Topic:
128
async def send(self, key=None, value=None, *, partition=None, **kwargs): ...
129
def send_soon(self, key=None, value=None, *, partition=None, **kwargs): ...
130
def stream(self, **kwargs) -> Stream: ...
131
def events(self, **kwargs) -> Stream: ...
132
def get_partition_key(self, key, partition): ...
133
134
class Channel:
135
async def send(self, value=None, *, key=None, partition=None, **kwargs): ...
136
def send_soon(self, value=None, *, key=None, partition=None, **kwargs): ...
137
def stream(self, **kwargs) -> Stream: ...
138
def events(self, **kwargs) -> Stream: ...
139
```
140
141
[Topics and Channels](./topics-channels.md)
142
143
### Data Management
144
145
Stateful data management through tables and models, including distributed key-value storage, windowing operations, and structured data modeling with serialization.
146
147
```python { .api }
148
class Table:
149
def __getitem__(self, key): ...
150
def __setitem__(self, key, value): ...
151
def get(self, key, default=None): ...
152
def setdefault(self, key, default=None): ...
153
def pop(self, key, *default): ...
154
def items(): ...
155
def keys(): ...
156
def values(): ...
157
def clear(self): ...
158
159
class GlobalTable(Table): ...
160
class SetTable: ...
161
class SetGlobalTable: ...
162
163
class Model:
164
def dumps(self, *, serializer=None) -> bytes: ...
165
@classmethod
166
def loads(cls, s: bytes, *, serializer=None): ...
167
def asdict(self) -> dict: ...
168
def derive(self, **fields): ...
169
170
class Record(Model): ...
171
class ModelOptions: ...
172
173
class FieldDescriptor: ...
174
class StringField(FieldDescriptor): ...
175
176
def maybe_model(arg) -> any: ...
177
178
registry: dict = {}
179
```
180
181
[Data Management](./data-management.md)
182
183
### Serialization
184
185
Data serialization and schema management for type-safe message handling, including codecs for different data formats and schema definitions for structured data.
186
187
```python { .api }
188
class Schema:
189
def loads_key(self, app, message, *, loads=None, serializer=None): ...
190
def loads_value(self, app, message, *, loads=None, serializer=None): ...
191
def dumps_key(self, app, key, *, serializer=None) -> bytes: ...
192
def dumps_value(self, app, value, *, serializer=None) -> bytes: ...
193
194
class Codec:
195
def encode(self, obj) -> bytes: ...
196
def decode(self, data: bytes): ...
197
```
198
199
[Serialization](./serialization.md)
200
201
### Authentication
202
203
Security and authentication mechanisms for secure connections to Kafka brokers, including SSL, SASL, and GSSAPI credential management.
204
205
```python { .api }
206
class SSLCredentials:
207
def __init__(self, *, context=None, purpose=None, cafile=None, **kwargs): ...
208
209
class SASLCredentials:
210
def __init__(self, *, mechanism=None, username=None, password=None, **kwargs): ...
211
212
class GSSAPICredentials:
213
def __init__(self, *, kerberos_service_name='kafka', **kwargs): ...
214
```
215
216
[Authentication](./authentication.md)
217
218
### Windowing
219
220
Time-based windowing operations for temporal data aggregation, including tumbling, hopping, and sliding window implementations for stream analytics.
221
222
```python { .api }
223
class Window: ...
224
225
class TumblingWindow(Window):
226
def __init__(self, size: float, *, expires=None): ...
227
228
class HoppingWindow(Window):
229
def __init__(self, size: float, step: float, *, expires=None): ...
230
231
class SlidingWindow(Window):
232
def __init__(self, before: float, after: float, *, expires=None): ...
233
```
234
235
[Windowing](./windowing.md)
236
237
### Monitoring and Sensors
238
239
Monitoring, metrics collection, and sensor framework for observability, including custom sensor implementations and integration with monitoring systems.
240
241
```python { .api }
242
class Sensor:
243
def on_message_in(self, tp, offset, message): ...
244
def on_message_out(self, tp, offset, message): ...
245
def on_table_get(self, table, key): ...
246
def on_table_set(self, table, key, value): ...
247
def on_commit_completed(self, consumer, state): ...
248
249
class Monitor(Sensor): ...
250
```
251
252
[Monitoring](./monitoring.md)
253
254
### CLI Framework
255
256
Command-line interface framework for building application-specific commands, including argument parsing, option handling, and integration with the Faust CLI system.
257
258
```python { .api }
259
class Command:
260
def run(self, *args, **kwargs): ...
261
262
class AppCommand(Command):
263
def run(self, *args, **kwargs): ...
264
265
class argument:
266
def __init__(self, *args, **kwargs): ...
267
def __call__(self, fun): ...
268
269
class option:
270
def __init__(self, *args, **kwargs): ...
271
def __call__(self, fun): ...
272
273
def call_command(command: str, args=None, **kwargs) -> tuple: ...
274
```
275
276
[CLI Framework](./cli-framework.md)
277
278
### Worker Management
279
280
Worker process management and service coordination, including application lifecycle management, process coordination, and service orchestration.
281
282
```python { .api }
283
class Worker:
284
def start(self): ...
285
def stop(self): ...
286
def restart(self): ...
287
288
class Service: ...
289
class ServiceT: ...
290
```
291
292
[Worker Management](./worker-management.md)
293
294
## Type Interfaces
295
296
Faust provides comprehensive type interfaces for static type checking:
297
298
```python { .api }
299
from typing import Protocol
300
301
class AppT(Protocol): ...
302
class AgentT(Protocol): ...
303
class ChannelT(Protocol): ...
304
class EventT(Protocol): ...
305
class StreamT(Protocol): ...
306
class TopicT(Protocol): ...
307
class ServiceT(Protocol): ...
308
```
309
310
## Configuration
311
312
Application configuration through the Settings class:
313
314
```python { .api }
315
class Settings:
316
# Configuration options for brokers, serialization, etc.
317
pass
318
```
319
320
## Utilities
321
322
Utility functions and helpers:
323
324
```python { .api }
325
def uuid() -> str: ...
326
```