or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

clients.mdexceptions.mdindex.mdintegration.mdmanagers.mdnamespaces.mdservers.md

managers.mddocs/

0

# Manager Classes

1

2

Client connection managers that handle message routing, room membership, and horizontal scaling through various pub/sub messaging systems. Managers enable Socket.IO applications to scale across multiple processes and servers while maintaining consistent real-time communication.

3

4

## Capabilities

5

6

### Manager

7

8

Basic client manager for single-process Socket.IO servers. Handles client connections, message routing, and room management within a single server instance.

9

10

```python { .api }

11

class Manager:

12

"""

13

Basic client manager for single-process servers.

14

15

Inherits from: BaseManager

16

17

Attributes:

18

logger: Logger instance for manager events

19

"""

20

21

def __init__(self, logger=False):

22

"""

23

Initialize the manager.

24

25

Args:

26

logger (bool or Logger): Enable logging or provide custom logger

27

"""

28

29

def can_disconnect(self, sid, namespace):

30

"""

31

Check if a client can be disconnected.

32

33

Args:

34

sid (str): Client session ID

35

namespace (str): Target namespace

36

37

Returns:

38

bool: True if client can be disconnected

39

"""

40

41

def emit(self, event, data, namespace, room=None, skip_sid=None, callback=None, **kwargs):

42

"""

43

Emit an event to connected clients.

44

45

Args:

46

event (str): Event name

47

data: Event data to send

48

namespace (str): Target namespace

49

room (str or list): Target room name(s)

50

skip_sid (str): Skip this client session ID

51

callback (callable): Callback function for response

52

**kwargs: Additional parameters (to, etc.)

53

"""

54

55

def disconnect(self, sid, namespace=None):

56

"""

57

Disconnect a client.

58

59

Args:

60

sid (str): Client session ID

61

namespace (str): Target namespace

62

"""

63

64

def enter_room(self, sid, namespace, room, eio_sid=None):

65

"""

66

Add a client to a room.

67

68

Args:

69

sid (str): Client session ID

70

namespace (str): Target namespace

71

room (str): Room name

72

eio_sid (str): Engine.IO session ID

73

"""

74

75

def leave_room(self, sid, namespace, room):

76

"""

77

Remove a client from a room.

78

79

Args:

80

sid (str): Client session ID

81

namespace (str): Target namespace

82

room (str): Room name

83

"""

84

85

def close_room(self, room, namespace):

86

"""

87

Remove all clients from a room and delete the room.

88

89

Args:

90

room (str): Room name

91

namespace (str): Target namespace

92

"""

93

94

def get_participants(self, namespace, room):

95

"""

96

Get the list of clients in a room.

97

98

Args:

99

namespace (str): Target namespace

100

room (str): Room name

101

102

Returns:

103

list: Client session IDs in the room

104

"""

105

106

def trigger_callback(self, sid, id, data):

107

"""

108

Trigger a callback function for a client.

109

110

Args:

111

sid (str): Client session ID

112

id (int): Callback ID

113

data: Callback data

114

"""

115

```

116

117

#### Usage Example

118

119

```python

120

import socketio

121

122

# Create server with basic manager

123

manager = socketio.Manager(logger=True)

124

sio = socketio.Server(client_manager=manager)

125

126

@sio.event

127

def connect(sid, environ):

128

print(f'Client {sid} connected')

129

130

@sio.event

131

def join_room(sid, data):

132

room = data['room']

133

sio.enter_room(sid, room)

134

# Manager handles the room membership internally

135

136

app = socketio.WSGIApp(sio)

137

```

138

139

### AsyncManager

140

141

Asynchronous version of the basic Manager for asyncio-based Socket.IO servers.

142

143

```python { .api }

144

class AsyncManager:

145

"""

146

Asyncio client manager.

147

148

Inherits from: BaseManager

149

150

Attributes:

151

logger: Logger instance for manager events

152

"""

153

154

def __init__(self, logger=False):

155

"""

156

Initialize the async manager.

157

158

Args:

159

logger (bool or Logger): Enable logging or provide custom logger

160

"""

161

162

async def can_disconnect(self, sid, namespace):

163

"""

164

Check if a client can be disconnected.

165

166

Args:

167

sid (str): Client session ID

168

namespace (str): Target namespace

169

170

Returns:

171

bool: True if client can be disconnected

172

"""

173

174

async def emit(self, event, data, namespace, room=None, skip_sid=None, callback=None, **kwargs):

175

"""

176

Emit an event to connected clients.

177

178

Args:

179

event (str): Event name

180

data: Event data to send

181

namespace (str): Target namespace

182

room (str or list): Target room name(s)

183

skip_sid (str): Skip this client session ID

184

callback (coroutine): Async callback function for response

185

**kwargs: Additional parameters

186

"""

187

188

async def disconnect(self, sid, namespace=None):

189

"""

190

Disconnect a client.

191

192

Args:

193

sid (str): Client session ID

194

namespace (str): Target namespace

195

"""

196

197

async def enter_room(self, sid, namespace, room, eio_sid=None):

198

"""

199

Add a client to a room.

200

201

Args:

202

sid (str): Client session ID

203

namespace (str): Target namespace

204

room (str): Room name

205

eio_sid (str): Engine.IO session ID

206

"""

207

208

async def leave_room(self, sid, namespace, room):

209

"""

210

Remove a client from a room.

211

212

Args:

213

sid (str): Client session ID

214

namespace (str): Target namespace

215

room (str): Room name

216

"""

217

218

async def close_room(self, room, namespace):

219

"""

220

Remove all clients from a room and delete the room.

221

222

Args:

223

room (str): Room name

224

namespace (str): Target namespace

225

"""

226

227

async def get_participants(self, namespace, room):

228

"""

229

Get the list of clients in a room.

230

231

Args:

232

namespace (str): Target namespace

233

room (str): Room name

234

235

Returns:

236

list: Client session IDs in the room

237

"""

238

239

async def trigger_callback(self, sid, id, data):

240

"""

241

Trigger a callback function for a client.

242

243

Args:

244

sid (str): Client session ID

245

id (int): Callback ID

246

data: Callback data

247

"""

248

```

249

250

### PubSubManager

251

252

Base class for pub/sub-based client managers that enable horizontal scaling across multiple server processes through external message brokers.

253

254

```python { .api }

255

class PubSubManager(Manager):

256

"""

257

Base class for pub/sub-based client managers.

258

259

Inherits from: Manager

260

261

Attributes:

262

channel (str): Message channel name

263

write_only (bool): Write-only mode (no message listening)

264

logger: Logger instance

265

"""

266

267

def __init__(self, channel='socketio', write_only=False, logger=None):

268

"""

269

Initialize the pub/sub manager.

270

271

Args:

272

channel (str): Message channel name (default: 'socketio')

273

write_only (bool): Write-only mode - don't listen for messages (default: False)

274

logger (Logger): Logger instance

275

"""

276

277

def _publish(self, data):

278

"""

279

Publish a message to the message broker.

280

281

Args:

282

data: Message data to publish

283

284

Note:

285

This is an abstract method that must be implemented by subclasses.

286

"""

287

raise NotImplementedError

288

289

def _listen(self):

290

"""

291

Listen for messages from the message broker.

292

293

Note:

294

This is an abstract method that must be implemented by subclasses.

295

"""

296

raise NotImplementedError

297

```

298

299

### RedisManager

300

301

Redis-based client manager that uses Redis pub/sub for horizontal scaling across multiple server instances.

302

303

```python { .api }

304

class RedisManager(PubSubManager):

305

"""

306

Redis-based client manager (synchronous).

307

308

Inherits from: PubSubManager

309

310

Attributes:

311

redis_url (str): Redis connection URL

312

channel (str): Redis channel name

313

redis: Redis client instance

314

"""

315

316

def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False,

317

logger=None, redis_options=None):

318

"""

319

Initialize the Redis manager.

320

321

Args:

322

url (str): Redis connection URL (default: 'redis://localhost:6379/0')

323

channel (str): Redis channel name (default: 'socketio')

324

write_only (bool): Write-only mode (default: False)

325

logger (Logger): Logger instance

326

redis_options (dict): Additional Redis client options

327

"""

328

329

def _publish(self, data):

330

"""

331

Publish a message to Redis.

332

333

Args:

334

data: Message data to publish

335

"""

336

337

def _listen(self):

338

"""

339

Listen for messages from Redis pub/sub.

340

"""

341

```

342

343

#### Usage Example

344

345

```python

346

import socketio

347

348

# Create Redis manager for scaling

349

redis_manager = socketio.RedisManager(

350

url='redis://localhost:6379/0',

351

channel='my-app-socketio',

352

logger=True

353

)

354

355

# Create server with Redis manager

356

sio = socketio.Server(client_manager=redis_manager)

357

358

@sio.event

359

def connect(sid, environ):

360

print(f'Client {sid} connected')

361

sio.enter_room(sid, 'global')

362

363

@sio.event

364

def broadcast_message(sid, data):

365

# This message will be broadcast across all server instances

366

sio.emit('message', data, room='global')

367

368

# Multiple server instances can use the same Redis manager

369

# and will share room memberships and message routing

370

app = socketio.WSGIApp(sio)

371

```

372

373

### AsyncRedisManager

374

375

Asynchronous Redis-based client manager for asyncio applications.

376

377

```python { .api }

378

class AsyncRedisManager(AsyncPubSubManager):

379

"""

380

Async Redis-based client manager.

381

382

Inherits from: AsyncPubSubManager

383

384

Attributes:

385

redis_url (str): Redis connection URL

386

channel (str): Redis channel name

387

redis: Async Redis client instance

388

"""

389

390

def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False,

391

logger=None, redis_options=None):

392

"""

393

Initialize the async Redis manager.

394

395

Args:

396

url (str): Redis connection URL (default: 'redis://localhost:6379/0')

397

channel (str): Redis channel name (default: 'socketio')

398

write_only (bool): Write-only mode (default: False)

399

logger (Logger): Logger instance

400

redis_options (dict): Additional Redis client options

401

"""

402

403

async def _publish(self, data):

404

"""

405

Publish a message to Redis.

406

407

Args:

408

data: Message data to publish

409

"""

410

411

async def _listen(self):

412

"""

413

Listen for messages from Redis pub/sub.

414

"""

415

```

416

417

#### Usage Example

418

419

```python

420

import socketio

421

422

# Create async Redis manager

423

redis_manager = socketio.AsyncRedisManager(

424

url='redis://redis-cluster:6379/0',

425

channel='async-app-socketio',

426

logger=True,

427

redis_options={'socket_timeout': 5}

428

)

429

430

# Create async server with Redis manager

431

sio = socketio.AsyncServer(client_manager=redis_manager)

432

433

@sio.event

434

async def connect(sid, environ):

435

print(f'Client {sid} connected')

436

await sio.enter_room(sid, 'async-global')

437

438

@sio.event

439

async def broadcast_message(sid, data):

440

# Broadcast across all async server instances

441

await sio.emit('message', data, room='async-global')

442

443

app = socketio.ASGIApp(sio)

444

```

445

446

### KafkaManager

447

448

Apache Kafka-based client manager for high-throughput, distributed message routing.

449

450

```python { .api }

451

class KafkaManager(PubSubManager):

452

"""

453

Apache Kafka-based client manager.

454

455

Inherits from: PubSubManager

456

457

Attributes:

458

kafka_url (str): Kafka broker URL

459

topic (str): Kafka topic name

460

"""

461

462

def __init__(self, url='kafka://localhost:9092', channel='socketio', write_only=False, logger=None):

463

"""

464

Initialize the Kafka manager.

465

466

Args:

467

url (str): Kafka broker URL (default: 'kafka://localhost:9092')

468

channel (str): Kafka topic name (default: 'socketio')

469

write_only (bool): Write-only mode (default: False)

470

logger (Logger): Logger instance

471

"""

472

473

def _publish(self, data):

474

"""

475

Publish a message to Kafka topic.

476

477

Args:

478

data: Message data to publish

479

"""

480

481

def _listen(self):

482

"""

483

Listen for messages from Kafka topic.

484

"""

485

```

486

487

#### Usage Example

488

489

```python

490

import socketio

491

492

# Create Kafka manager for high-throughput scaling

493

kafka_manager = socketio.KafkaManager(

494

url='kafka://kafka-broker1:9092,kafka-broker2:9092',

495

channel='realtime-events',

496

logger=True

497

)

498

499

sio = socketio.Server(client_manager=kafka_manager)

500

501

@sio.event

502

def connect(sid, environ):

503

sio.enter_room(sid, 'high-volume')

504

505

@sio.event

506

def process_event(sid, data):

507

# High-throughput event processing across Kafka cluster

508

sio.emit('event_processed', data, room='high-volume')

509

510

app = socketio.WSGIApp(sio)

511

```

512

513

### KombuManager

514

515

Kombu-based client manager supporting multiple message brokers including RabbitMQ, Redis, and others through the Kombu library.

516

517

```python { .api }

518

class KombuManager(PubSubManager):

519

"""

520

Kombu-based client manager for various message brokers.

521

522

Inherits from: PubSubManager

523

524

Attributes:

525

url (str): Message broker URL

526

channel (str): Message channel/queue name

527

"""

528

529

def __init__(self, url=None, channel='socketio', write_only=False, logger=None):

530

"""

531

Initialize the Kombu manager.

532

533

Args:

534

url (str): Message broker URL (format depends on broker type)

535

channel (str): Message channel/queue name (default: 'socketio')

536

write_only (bool): Write-only mode (default: False)

537

logger (Logger): Logger instance

538

539

Supported URLs:

540

- RabbitMQ: 'amqp://user:pass@host:port/vhost'

541

- Redis: 'redis://localhost:6379/0'

542

- SQS: 'sqs://access_key:secret_key@region'

543

"""

544

545

def _publish(self, data):

546

"""

547

Publish a message via Kombu.

548

549

Args:

550

data: Message data to publish

551

"""

552

553

def _listen(self):

554

"""

555

Listen for messages via Kombu.

556

"""

557

```

558

559

#### Usage Example

560

561

```python

562

import socketio

563

564

# RabbitMQ with Kombu

565

rabbitmq_manager = socketio.KombuManager(

566

url='amqp://user:password@rabbitmq-server:5672/myvhost',

567

channel='socketio-events',

568

logger=True

569

)

570

571

sio = socketio.Server(client_manager=rabbitmq_manager)

572

573

@sio.event

574

def connect(sid, environ):

575

sio.enter_room(sid, 'rabbitmq-room')

576

577

app = socketio.WSGIApp(sio)

578

```

579

580

### ZmqManager

581

582

ZeroMQ-based client manager for experimental message routing (requires eventlet and external ZMQ message broker).

583

584

```python { .api }

585

class ZmqManager(PubSubManager):

586

"""

587

ZeroMQ-based client manager (experimental).

588

589

Inherits from: PubSubManager

590

591

Attributes:

592

zmq_url (str): ZeroMQ connection URL

593

channel (str): Channel name

594

"""

595

596

def __init__(self, url='zmq+tcp://localhost:5555', channel='socketio', write_only=False, logger=None):

597

"""

598

Initialize the ZeroMQ manager.

599

600

Args:

601

url (str): ZeroMQ connection URL (default: 'zmq+tcp://localhost:5555')

602

channel (str): Channel name (default: 'socketio')

603

write_only (bool): Write-only mode (default: False)

604

logger (Logger): Logger instance

605

606

Requirements:

607

- eventlet must be used as the async mode

608

- External ZMQ message broker is required

609

"""

610

611

def _publish(self, data):

612

"""

613

Publish a message via ZeroMQ.

614

615

Args:

616

data: Message data to publish

617

"""

618

619

def _listen(self):

620

"""

621

Listen for messages via ZeroMQ.

622

"""

623

```

624

625

### AsyncAioPikaManager

626

627

Asynchronous RabbitMQ client manager using the aio_pika library for asyncio applications.

628

629

```python { .api }

630

class AsyncAioPikaManager(AsyncPubSubManager):

631

"""

632

Async RabbitMQ client manager using aio_pika.

633

634

Inherits from: AsyncPubSubManager

635

636

Attributes:

637

rabbitmq_url (str): RabbitMQ connection URL

638

channel (str): Exchange/queue name

639

"""

640

641

def __init__(self, url='amqp://guest:guest@localhost:5672//', channel='socketio',

642

write_only=False, logger=None):

643

"""

644

Initialize the async RabbitMQ manager.

645

646

Args:

647

url (str): RabbitMQ connection URL (default: 'amqp://guest:guest@localhost:5672//')

648

channel (str): Exchange/queue name (default: 'socketio')

649

write_only (bool): Write-only mode (default: False)

650

logger (Logger): Logger instance

651

"""

652

653

async def _publish(self, data):

654

"""

655

Publish a message to RabbitMQ.

656

657

Args:

658

data: Message data to publish

659

"""

660

661

async def _listen(self):

662

"""

663

Listen for messages from RabbitMQ.

664

"""

665

```

666

667

#### Usage Example

668

669

```python

670

import socketio

671

672

# Create async RabbitMQ manager

673

rabbitmq_manager = socketio.AsyncAioPikaManager(

674

url='amqp://user:password@rabbitmq-cluster:5672/production',

675

channel='async-socketio-events',

676

logger=True

677

)

678

679

sio = socketio.AsyncServer(client_manager=rabbitmq_manager)

680

681

@sio.event

682

async def connect(sid, environ):

683

await sio.enter_room(sid, 'async-rabbitmq-room')

684

685

@sio.event

686

async def process_async_event(sid, data):

687

# Process with async RabbitMQ scaling

688

await sio.emit('async_event_processed', data, room='async-rabbitmq-room')

689

690

app = socketio.ASGIApp(sio)

691

```

692

693

## Manager Selection Guidelines

694

695

Choose the appropriate manager based on your scaling and infrastructure requirements:

696

697

- **Manager/AsyncManager**: Single-process applications, development, simple deployments

698

- **RedisManager/AsyncRedisManager**: Multi-process scaling, moderate traffic, simple setup

699

- **KafkaManager**: High-throughput applications, event streaming, complex distributed systems

700

- **KombuManager**: Flexible broker support, existing RabbitMQ/AMQP infrastructure

701

- **AsyncAioPikaManager**: Async RabbitMQ with advanced features, asyncio applications

702

- **ZmqManager**: Experimental, specialized use cases requiring ZeroMQ

703

704

## Write-Only Mode

705

706

All pub/sub managers support write-only mode for scenarios where you only need to send messages but not receive them from other server instances:

707

708

```python

709

# Write-only Redis manager (doesn't listen for messages)

710

write_only_manager = socketio.RedisManager(

711

url='redis://localhost:6379/0',

712

write_only=True

713

)

714

```