or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcli-framework.mdcore-application.mddata-management.mdindex.mdmonitoring.mdserialization.mdstream-processing.mdtopics-channels.mdwindowing.mdworker-management.md

topics-channels.mddocs/

0

# Topics and Channels

1

2

Topic and channel management for message distribution in Faust applications. Topics represent Kafka topics with configuration, partitioning, and serialization support, while channels provide a generic interface for sending and receiving messages with flexible routing capabilities.

3

4

## Capabilities

5

6

### Topic Management

7

8

Kafka topic interface for publishing and subscribing to message streams. Topics provide type-safe message handling with configurable partitioning, serialization, and Kafka-specific settings like replication factor and retention policies.

9

10

```python { .api }

11

class Topic:

12

def __init__(

13

self,

14

app: App,

15

*,

16

topic: str,

17

key_type: type = None,

18

value_type: type = None,

19

key_serializer: str = None,

20

value_serializer: str = None,

21

partitions: int = None,

22

retention: int = None,

23

compacting: bool = None,

24

deleting: bool = None,

25

replicas: int = None,

26

acks: bool = True,

27

delivery_guarantee: str = 'at_least_once',

28

maxsize: int = None,

29

root: Topic = None,

30

config: dict = None,

31

**kwargs

32

):

33

"""

34

Create a new Kafka topic.

35

36

Args:

37

app: The Faust application instance

38

topic: Topic name in Kafka

39

key_type: Type for message keys (for serialization)

40

value_type: Type for message values (for serialization)

41

key_serializer: Serializer name for keys

42

value_serializer: Serializer name for values

43

partitions: Number of topic partitions

44

retention: Message retention time in seconds

45

compacting: Enable log compaction

46

deleting: Enable log deletion

47

replicas: Replication factor

48

acks: Acknowledgment level

49

delivery_guarantee: Message delivery guarantee

50

maxsize: Maximum queue size

51

root: Parent topic for derived topics

52

config: Additional Kafka topic configuration

53

"""

54

55

async def send(

56

self,

57

key: any = None,

58

value: any = None,

59

*,

60

partition: int = None,

61

timestamp: float = None,

62

headers: dict = None,

63

schema: Schema = None,

64

key_serializer: str = None,

65

value_serializer: str = None,

66

callback: callable = None,

67

force: bool = False

68

) -> FutureMessage:

69

"""

70

Send message to topic asynchronously.

71

72

Args:

73

key: Message key

74

value: Message value

75

partition: Target partition (optional)

76

timestamp: Message timestamp

77

headers: Message headers

78

schema: Custom schema for serialization

79

key_serializer: Override key serializer

80

value_serializer: Override value serializer

81

callback: Callback for send completion

82

force: Force send even if app not started

83

84

Returns:

85

Future representing the send operation

86

"""

87

88

def send_soon(

89

self,

90

key: any = None,

91

value: any = None,

92

*,

93

partition: int = None,

94

timestamp: float = None,

95

headers: dict = None,

96

schema: Schema = None,

97

key_serializer: str = None,

98

value_serializer: str = None,

99

callback: callable = None,

100

force: bool = False,

101

eager_partitioning: bool = None

102

) -> FutureMessage:

103

"""

104

Send message to topic without waiting for completion.

105

106

Args:

107

key: Message key

108

value: Message value

109

partition: Target partition (optional)

110

timestamp: Message timestamp

111

headers: Message headers

112

schema: Custom schema for serialization

113

key_serializer: Override key serializer

114

value_serializer: Override value serializer

115

callback: Callback for send completion

116

force: Force send even if app not started

117

eager_partitioning: Determine partition immediately

118

119

Returns:

120

Future representing the send operation

121

"""

122

123

def stream(self, **kwargs) -> Stream:

124

"""

125

Create a stream that consumes from this topic.

126

127

Args:

128

**kwargs: Stream configuration options

129

130

Returns:

131

Stream instance for processing messages

132

"""

133

134

def events(self, **kwargs) -> Stream:

135

"""

136

Create an event stream that consumes from this topic.

137

138

Args:

139

**kwargs: Stream configuration options

140

141

Returns:

142

Stream instance with Event objects

143

"""

144

145

def get_partition_key(self, key: any, partition: int = None) -> int:

146

"""

147

Get partition number for a given key.

148

149

Args:

150

key: Message key

151

partition: Explicit partition override

152

153

Returns:

154

Partition number for the key

155

"""

156

157

@property

158

def name(self) -> str:

159

"""Topic name in Kafka."""

160

161

@property

162

def key_type(self) -> type:

163

"""Type for message keys."""

164

165

@property

166

def value_type(self) -> type:

167

"""Type for message values."""

168

169

@property

170

def partitions(self) -> int:

171

"""Number of topic partitions."""

172

173

@property

174

def config(self) -> dict:

175

"""Kafka topic configuration."""

176

```

177

178

### Channel Interface

179

180

Generic communication channel interface for sending and receiving messages. Channels provide a unified abstraction over different transport mechanisms and can be used independently of Kafka topics for in-memory message passing or custom routing.

181

182

```python { .api }

183

class Channel:

184

def __init__(

185

self,

186

app: App,

187

*,

188

key_type: type = None,

189

value_type: type = None,

190

maxsize: int = None,

191

**kwargs

192

):

193

"""

194

Create a new communication channel.

195

196

Args:

197

app: The Faust application instance

198

key_type: Type for message keys

199

value_type: Type for message values

200

maxsize: Maximum queue size

201

"""

202

203

async def send(

204

self,

205

value: any = None,

206

*,

207

key: any = None,

208

partition: int = None,

209

timestamp: float = None,

210

headers: dict = None,

211

schema: Schema = None,

212

key_serializer: str = None,

213

value_serializer: str = None,

214

callback: callable = None,

215

force: bool = False

216

) -> any:

217

"""

218

Send message to channel asynchronously.

219

220

Args:

221

value: Message value

222

key: Message key (optional)

223

partition: Target partition (optional)

224

timestamp: Message timestamp

225

headers: Message headers

226

schema: Custom schema for serialization

227

key_serializer: Override key serializer

228

value_serializer: Override value serializer

229

callback: Callback for send completion

230

force: Force send even if app not started

231

232

Returns:

233

Result of the send operation

234

"""

235

236

def send_soon(

237

self,

238

value: any = None,

239

*,

240

key: any = None,

241

partition: int = None,

242

timestamp: float = None,

243

headers: dict = None,

244

schema: Schema = None,

245

key_serializer: str = None,

246

value_serializer: str = None,

247

callback: callable = None,

248

force: bool = False,

249

eager_partitioning: bool = None

250

) -> any:

251

"""

252

Send message to channel without waiting for completion.

253

254

Args:

255

value: Message value

256

key: Message key (optional)

257

partition: Target partition (optional)

258

timestamp: Message timestamp

259

headers: Message headers

260

schema: Custom schema for serialization

261

key_serializer: Override key serializer

262

value_serializer: Override value serializer

263

callback: Callback for send completion

264

force: Force send even if app not started

265

eager_partitioning: Determine partition immediately

266

267

Returns:

268

Future representing the send operation

269

"""

270

271

def stream(self, **kwargs) -> Stream:

272

"""

273

Create a stream that consumes from this channel.

274

275

Args:

276

**kwargs: Stream configuration options

277

278

Returns:

279

Stream instance for processing messages

280

"""

281

282

def events(self, **kwargs) -> Stream:

283

"""

284

Create an event stream that consumes from this channel.

285

286

Args:

287

**kwargs: Stream configuration options

288

289

Returns:

290

Stream instance with Event objects

291

"""

292

293

@property

294

def key_type(self) -> type:

295

"""Type for message keys."""

296

297

@property

298

def value_type(self) -> type:

299

"""Type for message values."""

300

301

@property

302

def maxsize(self) -> int:

303

"""Maximum queue size."""

304

```

305

306

### Topic Configuration

307

308

Advanced topic configuration and management utilities for Kafka-specific features and optimizations.

309

310

```python { .api }

311

def create_topic(

312

app: App,

313

topic: str,

314

*,

315

partitions: int = None,

316

replication_factor: int = None,

317

config: dict = None,

318

**kwargs

319

) -> Topic:

320

"""

321

Create a topic with specific configuration.

322

323

Args:

324

app: Faust application instance

325

topic: Topic name

326

partitions: Number of partitions

327

replication_factor: Replication factor

328

config: Kafka topic configuration

329

330

Returns:

331

Configured Topic instance

332

"""

333

334

class TopicManager:

335

def __init__(self, app: App):

336

"""

337

Topic lifecycle management.

338

339

Args:

340

app: Faust application instance

341

"""

342

343

async def create_topic(

344

self,

345

topic: str,

346

partitions: int,

347

replication_factor: int,

348

**config

349

) -> None:

350

"""

351

Create topic in Kafka cluster.

352

353

Args:

354

topic: Topic name

355

partitions: Number of partitions

356

replication_factor: Replication factor

357

**config: Additional topic configuration

358

"""

359

360

async def delete_topic(self, topic: str) -> None:

361

"""

362

Delete topic from Kafka cluster.

363

364

Args:

365

topic: Topic name to delete

366

"""

367

368

async def list_topics(self) -> list:

369

"""

370

List all topics in Kafka cluster.

371

372

Returns:

373

List of topic names

374

"""

375

```

376

377

## Usage Examples

378

379

### Basic Topic Usage

380

381

```python

382

import faust

383

384

app = faust.App('my-app', broker='kafka://localhost:9092')

385

386

# Define a topic with type annotations

387

orders_topic = app.topic('orders', value_type=dict)

388

389

# Send messages to topic

390

@app.timer(interval=5.0)

391

async def produce_orders():

392

order = {'id': 123, 'product': 'widget', 'quantity': 5}

393

await orders_topic.send(key='order-123', value=order)

394

395

# Consume from topic

396

@app.agent(orders_topic)

397

async def process_orders(orders):

398

async for order in orders:

399

print(f"Processing order: {order}")

400

```

401

402

### Advanced Topic Configuration

403

404

```python

405

# Topic with custom configuration

406

events_topic = app.topic(

407

'events',

408

key_type=str,

409

value_type=dict,

410

partitions=8,

411

retention=86400, # 24 hours

412

compacting=True,

413

config={

414

'cleanup.policy': 'compact',

415

'segment.bytes': 104857600, # 100MB

416

'min.cleanable.dirty.ratio': 0.1

417

}

418

)

419

420

# Send with custom serialization

421

await events_topic.send(

422

key='user-123',

423

value={'event': 'login', 'timestamp': time.time()},

424

headers={'source': 'web', 'version': '1.0'},

425

value_serializer='json'

426

)

427

```

428

429

### Channel-based Communication

430

431

```python

432

# In-memory channel for internal communication

433

notifications_channel = app.channel(value_type=dict)

434

435

@app.agent(notifications_channel)

436

async def handle_notifications(notifications):

437

async for notification in notifications:

438

print(f"Notification: {notification['message']}")

439

440

# Send to channel from anywhere in the application

441

async def send_notification(message: str, user_id: str):

442

await notifications_channel.send({

443

'message': message,

444

'user_id': user_id,

445

'timestamp': time.time()

446

})

447

```

448

449

### Topic Partitioning

450

451

```python

452

# Custom partitioning logic

453

user_events_topic = app.topic('user-events', key_type=str, value_type=dict)

454

455

async def send_user_event(user_id: str, event_data: dict):

456

# Ensure events for the same user go to the same partition

457

partition = hash(user_id) % user_events_topic.partitions

458

459

await user_events_topic.send(

460

key=user_id,

461

value=event_data,

462

partition=partition

463

)

464

```

465

466

## Type Interfaces

467

468

```python { .api }

469

from typing import Protocol, Optional, Dict, Any, Callable, AsyncIterator

470

471

class TopicT(Protocol):

472

"""Type interface for Topic."""

473

474

name: str

475

key_type: Optional[type]

476

value_type: Optional[type]

477

partitions: int

478

479

async def send(

480

self,

481

key: Any = None,

482

value: Any = None,

483

*,

484

partition: Optional[int] = None,

485

**kwargs

486

) -> Any: ...

487

488

def stream(self, **kwargs) -> 'StreamT': ...

489

490

class ChannelT(Protocol):

491

"""Type interface for Channel."""

492

493

key_type: Optional[type]

494

value_type: Optional[type]

495

maxsize: Optional[int]

496

497

async def send(

498

self,

499

value: Any = None,

500

*,

501

key: Any = None,

502

**kwargs

503

) -> Any: ...

504

505

def stream(self, **kwargs) -> 'StreamT': ...

506

```