0
# Topics and Channels
1
2
Topic and channel management for message distribution in Faust applications. Topics represent Kafka topics with configuration, partitioning, and serialization support, while channels provide a generic interface for sending and receiving messages with flexible routing capabilities.
3
4
## Capabilities
5
6
### Topic Management
7
8
Kafka topic interface for publishing and subscribing to message streams. Topics provide type-safe message handling with configurable partitioning, serialization, and Kafka-specific settings like replication factor and retention policies.
9
10
```python { .api }
11
class Topic:
12
def __init__(
13
self,
14
app: App,
15
*,
16
topic: str,
17
key_type: type = None,
18
value_type: type = None,
19
key_serializer: str = None,
20
value_serializer: str = None,
21
partitions: int = None,
22
retention: int = None,
23
compacting: bool = None,
24
deleting: bool = None,
25
replicas: int = None,
26
acks: bool = True,
27
delivery_guarantee: str = 'at_least_once',
28
maxsize: int = None,
29
root: Topic = None,
30
config: dict = None,
31
**kwargs
32
):
33
"""
34
Create a new Kafka topic.
35
36
Args:
37
app: The Faust application instance
38
topic: Topic name in Kafka
39
key_type: Type for message keys (for serialization)
40
value_type: Type for message values (for serialization)
41
key_serializer: Serializer name for keys
42
value_serializer: Serializer name for values
43
partitions: Number of topic partitions
44
retention: Message retention time in seconds
45
compacting: Enable log compaction
46
deleting: Enable log deletion
47
replicas: Replication factor
48
acks: Acknowledgment level
49
delivery_guarantee: Message delivery guarantee
50
maxsize: Maximum queue size
51
root: Parent topic for derived topics
52
config: Additional Kafka topic configuration
53
"""
54
55
async def send(
56
self,
57
key: any = None,
58
value: any = None,
59
*,
60
partition: int = None,
61
timestamp: float = None,
62
headers: dict = None,
63
schema: Schema = None,
64
key_serializer: str = None,
65
value_serializer: str = None,
66
callback: callable = None,
67
force: bool = False
68
) -> FutureMessage:
69
"""
70
Send message to topic asynchronously.
71
72
Args:
73
key: Message key
74
value: Message value
75
partition: Target partition (optional)
76
timestamp: Message timestamp
77
headers: Message headers
78
schema: Custom schema for serialization
79
key_serializer: Override key serializer
80
value_serializer: Override value serializer
81
callback: Callback for send completion
82
force: Force send even if app not started
83
84
Returns:
85
Future representing the send operation
86
"""
87
88
def send_soon(
89
self,
90
key: any = None,
91
value: any = None,
92
*,
93
partition: int = None,
94
timestamp: float = None,
95
headers: dict = None,
96
schema: Schema = None,
97
key_serializer: str = None,
98
value_serializer: str = None,
99
callback: callable = None,
100
force: bool = False,
101
eager_partitioning: bool = None
102
) -> FutureMessage:
103
"""
104
Send message to topic without waiting for completion.
105
106
Args:
107
key: Message key
108
value: Message value
109
partition: Target partition (optional)
110
timestamp: Message timestamp
111
headers: Message headers
112
schema: Custom schema for serialization
113
key_serializer: Override key serializer
114
value_serializer: Override value serializer
115
callback: Callback for send completion
116
force: Force send even if app not started
117
eager_partitioning: Determine partition immediately
118
119
Returns:
120
Future representing the send operation
121
"""
122
123
def stream(self, **kwargs) -> Stream:
124
"""
125
Create a stream that consumes from this topic.
126
127
Args:
128
**kwargs: Stream configuration options
129
130
Returns:
131
Stream instance for processing messages
132
"""
133
134
def events(self, **kwargs) -> Stream:
135
"""
136
Create an event stream that consumes from this topic.
137
138
Args:
139
**kwargs: Stream configuration options
140
141
Returns:
142
Stream instance with Event objects
143
"""
144
145
def get_partition_key(self, key: any, partition: int = None) -> int:
146
"""
147
Get partition number for a given key.
148
149
Args:
150
key: Message key
151
partition: Explicit partition override
152
153
Returns:
154
Partition number for the key
155
"""
156
157
@property
158
def name(self) -> str:
159
"""Topic name in Kafka."""
160
161
@property
162
def key_type(self) -> type:
163
"""Type for message keys."""
164
165
@property
166
def value_type(self) -> type:
167
"""Type for message values."""
168
169
@property
170
def partitions(self) -> int:
171
"""Number of topic partitions."""
172
173
@property
174
def config(self) -> dict:
175
"""Kafka topic configuration."""
176
```
177
178
### Channel Interface
179
180
Generic communication channel interface for sending and receiving messages. Channels provide a unified abstraction over different transport mechanisms and can be used independently of Kafka topics for in-memory message passing or custom routing.
181
182
```python { .api }
183
class Channel:
184
def __init__(
185
self,
186
app: App,
187
*,
188
key_type: type = None,
189
value_type: type = None,
190
maxsize: int = None,
191
**kwargs
192
):
193
"""
194
Create a new communication channel.
195
196
Args:
197
app: The Faust application instance
198
key_type: Type for message keys
199
value_type: Type for message values
200
maxsize: Maximum queue size
201
"""
202
203
async def send(
204
self,
205
value: any = None,
206
*,
207
key: any = None,
208
partition: int = None,
209
timestamp: float = None,
210
headers: dict = None,
211
schema: Schema = None,
212
key_serializer: str = None,
213
value_serializer: str = None,
214
callback: callable = None,
215
force: bool = False
216
) -> any:
217
"""
218
Send message to channel asynchronously.
219
220
Args:
221
value: Message value
222
key: Message key (optional)
223
partition: Target partition (optional)
224
timestamp: Message timestamp
225
headers: Message headers
226
schema: Custom schema for serialization
227
key_serializer: Override key serializer
228
value_serializer: Override value serializer
229
callback: Callback for send completion
230
force: Force send even if app not started
231
232
Returns:
233
Result of the send operation
234
"""
235
236
def send_soon(
237
self,
238
value: any = None,
239
*,
240
key: any = None,
241
partition: int = None,
242
timestamp: float = None,
243
headers: dict = None,
244
schema: Schema = None,
245
key_serializer: str = None,
246
value_serializer: str = None,
247
callback: callable = None,
248
force: bool = False,
249
eager_partitioning: bool = None
250
) -> any:
251
"""
252
Send message to channel without waiting for completion.
253
254
Args:
255
value: Message value
256
key: Message key (optional)
257
partition: Target partition (optional)
258
timestamp: Message timestamp
259
headers: Message headers
260
schema: Custom schema for serialization
261
key_serializer: Override key serializer
262
value_serializer: Override value serializer
263
callback: Callback for send completion
264
force: Force send even if app not started
265
eager_partitioning: Determine partition immediately
266
267
Returns:
268
Future representing the send operation
269
"""
270
271
def stream(self, **kwargs) -> Stream:
272
"""
273
Create a stream that consumes from this channel.
274
275
Args:
276
**kwargs: Stream configuration options
277
278
Returns:
279
Stream instance for processing messages
280
"""
281
282
def events(self, **kwargs) -> Stream:
283
"""
284
Create an event stream that consumes from this channel.
285
286
Args:
287
**kwargs: Stream configuration options
288
289
Returns:
290
Stream instance with Event objects
291
"""
292
293
@property
294
def key_type(self) -> type:
295
"""Type for message keys."""
296
297
@property
298
def value_type(self) -> type:
299
"""Type for message values."""
300
301
@property
302
def maxsize(self) -> int:
303
"""Maximum queue size."""
304
```
305
306
### Topic Configuration
307
308
Advanced topic configuration and management utilities for Kafka-specific features and optimizations.
309
310
```python { .api }
311
def create_topic(
312
app: App,
313
topic: str,
314
*,
315
partitions: int = None,
316
replication_factor: int = None,
317
config: dict = None,
318
**kwargs
319
) -> Topic:
320
"""
321
Create a topic with specific configuration.
322
323
Args:
324
app: Faust application instance
325
topic: Topic name
326
partitions: Number of partitions
327
replication_factor: Replication factor
328
config: Kafka topic configuration
329
330
Returns:
331
Configured Topic instance
332
"""
333
334
class TopicManager:
335
def __init__(self, app: App):
336
"""
337
Topic lifecycle management.
338
339
Args:
340
app: Faust application instance
341
"""
342
343
async def create_topic(
344
self,
345
topic: str,
346
partitions: int,
347
replication_factor: int,
348
**config
349
) -> None:
350
"""
351
Create topic in Kafka cluster.
352
353
Args:
354
topic: Topic name
355
partitions: Number of partitions
356
replication_factor: Replication factor
357
**config: Additional topic configuration
358
"""
359
360
async def delete_topic(self, topic: str) -> None:
361
"""
362
Delete topic from Kafka cluster.
363
364
Args:
365
topic: Topic name to delete
366
"""
367
368
async def list_topics(self) -> list:
369
"""
370
List all topics in Kafka cluster.
371
372
Returns:
373
List of topic names
374
"""
375
```
376
377
## Usage Examples
378
379
### Basic Topic Usage
380
381
```python
382
import faust
383
384
app = faust.App('my-app', broker='kafka://localhost:9092')
385
386
# Define a topic with type annotations
387
orders_topic = app.topic('orders', value_type=dict)
388
389
# Send messages to topic
390
@app.timer(interval=5.0)
391
async def produce_orders():
392
order = {'id': 123, 'product': 'widget', 'quantity': 5}
393
await orders_topic.send(key='order-123', value=order)
394
395
# Consume from topic
396
@app.agent(orders_topic)
397
async def process_orders(orders):
398
async for order in orders:
399
print(f"Processing order: {order}")
400
```
401
402
### Advanced Topic Configuration
403
404
```python
405
# Topic with custom configuration
406
events_topic = app.topic(
407
'events',
408
key_type=str,
409
value_type=dict,
410
partitions=8,
411
retention=86400, # 24 hours
412
compacting=True,
413
config={
414
'cleanup.policy': 'compact',
415
'segment.bytes': 104857600, # 100MB
416
'min.cleanable.dirty.ratio': 0.1
417
}
418
)
419
420
# Send with custom serialization
421
await events_topic.send(
422
key='user-123',
423
value={'event': 'login', 'timestamp': time.time()},
424
headers={'source': 'web', 'version': '1.0'},
425
value_serializer='json'
426
)
427
```
428
429
### Channel-based Communication
430
431
```python
432
# In-memory channel for internal communication
433
notifications_channel = app.channel(value_type=dict)
434
435
@app.agent(notifications_channel)
436
async def handle_notifications(notifications):
437
async for notification in notifications:
438
print(f"Notification: {notification['message']}")
439
440
# Send to channel from anywhere in the application
441
async def send_notification(message: str, user_id: str):
442
await notifications_channel.send({
443
'message': message,
444
'user_id': user_id,
445
'timestamp': time.time()
446
})
447
```
448
449
### Topic Partitioning
450
451
```python
452
# Custom partitioning logic
453
user_events_topic = app.topic('user-events', key_type=str, value_type=dict)
454
455
async def send_user_event(user_id: str, event_data: dict):
456
# Ensure events for the same user go to the same partition
457
partition = hash(user_id) % user_events_topic.partitions
458
459
await user_events_topic.send(
460
key=user_id,
461
value=event_data,
462
partition=partition
463
)
464
```
465
466
## Type Interfaces
467
468
```python { .api }
469
from typing import Protocol, Optional, Dict, Any, Callable, AsyncIterator
470
471
class TopicT(Protocol):
472
"""Type interface for Topic."""
473
474
name: str
475
key_type: Optional[type]
476
value_type: Optional[type]
477
partitions: int
478
479
async def send(
480
self,
481
key: Any = None,
482
value: Any = None,
483
*,
484
partition: Optional[int] = None,
485
**kwargs
486
) -> Any: ...
487
488
def stream(self, **kwargs) -> 'StreamT': ...
489
490
class ChannelT(Protocol):
491
"""Type interface for Channel."""
492
493
key_type: Optional[type]
494
value_type: Optional[type]
495
maxsize: Optional[int]
496
497
async def send(
498
self,
499
value: Any = None,
500
*,
501
key: Any = None,
502
**kwargs
503
) -> Any: ...
504
505
def stream(self, **kwargs) -> 'StreamT': ...
506
```