or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/faust@1.10.x

To install, run

npx @tessl/cli install tessl/pypi-faust@1.10.0

0

# Faust

1

2

A Python stream processing library that ports the ideas from Kafka Streams to Python. Faust enables building distributed systems and real-time data pipelines that process streams of data with state management, automatic failover, and changelog-based replication.

3

4

## Package Information

5

6

- **Package Name**: faust

7

- **Language**: Python

8

- **Python Version**: >=3.6.0

9

- **Installation**: `pip install faust`

10

- **License**: BSD-3-Clause

11

12

## Core Imports

13

14

```python

15

import faust

16

```

17

18

Common imports for stream processing:

19

20

```python

21

from faust import App, Agent, Stream, Table, Record

22

```

23

24

## Basic Usage

25

26

```python

27

import faust

28

29

# Create a Faust application

30

app = faust.App('word-count-app', broker='kafka://localhost:9092')

31

32

# Define a data model

33

class WordCount(faust.Record):

34

word: str

35

count: int

36

37

# Create a topic

38

words_topic = app.topic('words', value_type=str)

39

word_counts_table = app.Table('word-counts', default=int)

40

41

# Define a stream processing agent

42

@app.agent(words_topic)

43

async def count_words(words):

44

async for word in words:

45

word_counts_table[word] += 1

46

47

# Define a timer to print current counts

48

@app.timer(interval=10.0)

49

async def print_counts():

50

for word, count in word_counts_table.items():

51

print(f'{word}: {count}')

52

53

# Run the application

54

if __name__ == '__main__':

55

app.main()

56

```

57

58

## Architecture

59

60

Faust follows a decorator-based architecture centered around the `App` class:

61

62

- **App**: Main application container that manages agents, topics, tables, and services

63

- **Agents**: Stream processing functions decorated with `@app.agent()` that consume from channels

64

- **Topics**: Named channels backed by Kafka topics for message distribution

65

- **Streams**: Async iterators over events in channels with transformation capabilities

66

- **Tables**: Distributed key-value stores with changelog-based replication for stateful processing

67

- **Events**: Message containers with key, value, headers, and acknowledgment capabilities

68

- **Models**: Structured data classes for type-safe serialization/deserialization

69

70

This design enables building scalable, fault-tolerant stream processing applications that integrate seamlessly with the Python ecosystem.

71

72

## Capabilities

73

74

### Core Application Framework

75

76

The foundational components for creating and managing Faust applications, including the main App class and decorators for defining agents, topics, tables, web endpoints, and CLI commands.

77

78

```python { .api }

79

class App:

80

def __init__(self, id: str, *, broker: str = None, **kwargs): ...

81

def agent(self, channel=None, *, name=None, concurrency=1, **kwargs): ...

82

def topic(self, topic: str, *, key_type=None, value_type=None, **kwargs): ...

83

def table(self, name: str, *, default=None, window=None, **kwargs): ...

84

def timer(self, seconds: float, *, on_error=None, **kwargs): ...

85

def crontab(self, cron_format: str, *, timezone=None, **kwargs): ...

86

def command(self, *options, base=None, **kwargs): ...

87

def page(self, path: str, *, cors_options=None, **kwargs): ...

88

def main(self): ...

89

```

90

91

[Core Application](./core-application.md)

92

93

### Stream Processing

94

95

Stream processing components including agents for consuming data streams, stream transformation operations, and event handling for building real-time data processing pipelines.

96

97

```python { .api }

98

class Agent:

99

async def send(self, value=None, *, key=None, partition=None): ...

100

async def ask(self, value=None, *, key=None, reply_to=None, **kwargs): ...

101

def cast(self, value=None, *, key=None, partition=None): ...

102

103

class Stream:

104

def filter(self, fun): ...

105

def map(self, fun): ...

106

def group_by(self, key, *, name=None): ...

107

def take(self, max_: int, *, within=None): ...

108

def rate_limit(self, rate: float, *, per=1.0): ...

109

def through(self, channel, **kwargs): ...

110

111

class Event:

112

def ack(self): ...

113

def reject(self): ...

114

async def send(self, channel, key=None, value=None, **kwargs): ...

115

async def forward(self, channel, *, key=None, value=None, **kwargs): ...

116

117

def current_event() -> Event: ...

118

```

119

120

[Stream Processing](./stream-processing.md)

121

122

### Topics and Channels

123

124

Topic and channel management for message distribution, including configuration, partitioning, serialization, and integration with Kafka infrastructure.

125

126

```python { .api }

127

class Topic:

128

async def send(self, key=None, value=None, *, partition=None, **kwargs): ...

129

def send_soon(self, key=None, value=None, *, partition=None, **kwargs): ...

130

def stream(self, **kwargs) -> Stream: ...

131

def events(self, **kwargs) -> Stream: ...

132

def get_partition_key(self, key, partition): ...

133

134

class Channel:

135

async def send(self, value=None, *, key=None, partition=None, **kwargs): ...

136

def send_soon(self, value=None, *, key=None, partition=None, **kwargs): ...

137

def stream(self, **kwargs) -> Stream: ...

138

def events(self, **kwargs) -> Stream: ...

139

```

140

141

[Topics and Channels](./topics-channels.md)

142

143

### Data Management

144

145

Stateful data management through tables and models, including distributed key-value storage, windowing operations, and structured data modeling with serialization.

146

147

```python { .api }

148

class Table:

149

def __getitem__(self, key): ...

150

def __setitem__(self, key, value): ...

151

def get(self, key, default=None): ...

152

def setdefault(self, key, default=None): ...

153

def pop(self, key, *default): ...

154

def items(): ...

155

def keys(): ...

156

def values(): ...

157

def clear(self): ...

158

159

class GlobalTable(Table): ...

160

class SetTable: ...

161

class SetGlobalTable: ...

162

163

class Model:

164

def dumps(self, *, serializer=None) -> bytes: ...

165

@classmethod

166

def loads(cls, s: bytes, *, serializer=None): ...

167

def asdict(self) -> dict: ...

168

def derive(self, **fields): ...

169

170

class Record(Model): ...

171

class ModelOptions: ...

172

173

class FieldDescriptor: ...

174

class StringField(FieldDescriptor): ...

175

176

def maybe_model(arg) -> any: ...

177

178

registry: dict = {}

179

```

180

181

[Data Management](./data-management.md)

182

183

### Serialization

184

185

Data serialization and schema management for type-safe message handling, including codecs for different data formats and schema definitions for structured data.

186

187

```python { .api }

188

class Schema:

189

def loads_key(self, app, message, *, loads=None, serializer=None): ...

190

def loads_value(self, app, message, *, loads=None, serializer=None): ...

191

def dumps_key(self, app, key, *, serializer=None) -> bytes: ...

192

def dumps_value(self, app, value, *, serializer=None) -> bytes: ...

193

194

class Codec:

195

def encode(self, obj) -> bytes: ...

196

def decode(self, data: bytes): ...

197

```

198

199

[Serialization](./serialization.md)

200

201

### Authentication

202

203

Security and authentication mechanisms for secure connections to Kafka brokers, including SSL, SASL, and GSSAPI credential management.

204

205

```python { .api }

206

class SSLCredentials:

207

def __init__(self, *, context=None, purpose=None, cafile=None, **kwargs): ...

208

209

class SASLCredentials:

210

def __init__(self, *, mechanism=None, username=None, password=None, **kwargs): ...

211

212

class GSSAPICredentials:

213

def __init__(self, *, kerberos_service_name='kafka', **kwargs): ...

214

```

215

216

[Authentication](./authentication.md)

217

218

### Windowing

219

220

Time-based windowing operations for temporal data aggregation, including tumbling, hopping, and sliding window implementations for stream analytics.

221

222

```python { .api }

223

class Window: ...

224

225

class TumblingWindow(Window):

226

def __init__(self, size: float, *, expires=None): ...

227

228

class HoppingWindow(Window):

229

def __init__(self, size: float, step: float, *, expires=None): ...

230

231

class SlidingWindow(Window):

232

def __init__(self, before: float, after: float, *, expires=None): ...

233

```

234

235

[Windowing](./windowing.md)

236

237

### Monitoring and Sensors

238

239

Monitoring, metrics collection, and sensor framework for observability, including custom sensor implementations and integration with monitoring systems.

240

241

```python { .api }

242

class Sensor:

243

def on_message_in(self, tp, offset, message): ...

244

def on_message_out(self, tp, offset, message): ...

245

def on_table_get(self, table, key): ...

246

def on_table_set(self, table, key, value): ...

247

def on_commit_completed(self, consumer, state): ...

248

249

class Monitor(Sensor): ...

250

```

251

252

[Monitoring](./monitoring.md)

253

254

### CLI Framework

255

256

Command-line interface framework for building application-specific commands, including argument parsing, option handling, and integration with the Faust CLI system.

257

258

```python { .api }

259

class Command:

260

def run(self, *args, **kwargs): ...

261

262

class AppCommand(Command):

263

def run(self, *args, **kwargs): ...

264

265

class argument:

266

def __init__(self, *args, **kwargs): ...

267

def __call__(self, fun): ...

268

269

class option:

270

def __init__(self, *args, **kwargs): ...

271

def __call__(self, fun): ...

272

273

def call_command(command: str, args=None, **kwargs) -> tuple: ...

274

```

275

276

[CLI Framework](./cli-framework.md)

277

278

### Worker Management

279

280

Worker process management and service coordination, including application lifecycle management, process coordination, and service orchestration.

281

282

```python { .api }

283

class Worker:

284

def start(self): ...

285

def stop(self): ...

286

def restart(self): ...

287

288

class Service: ...

289

class ServiceT: ...

290

```

291

292

[Worker Management](./worker-management.md)

293

294

## Type Interfaces

295

296

Faust provides comprehensive type interfaces for static type checking:

297

298

```python { .api }

299

from typing import Protocol

300

301

class AppT(Protocol): ...

302

class AgentT(Protocol): ...

303

class ChannelT(Protocol): ...

304

class EventT(Protocol): ...

305

class StreamT(Protocol): ...

306

class TopicT(Protocol): ...

307

class ServiceT(Protocol): ...

308

```

309

310

## Configuration

311

312

Application configuration through the Settings class:

313

314

```python { .api }

315

class Settings:

316

# Configuration options for brokers, serialization, etc.

317

pass

318

```

319

320

## Utilities

321

322

Utility functions and helpers:

323

324

```python { .api }

325

def uuid() -> str: ...

326

```