or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

address-endpoints.mdasync-operations.mdauthentication.mdclient-apis.mdconnection-session.mderror-handling.mdhigh-level-messaging.mdindex.mdlow-level-protocol.mdmessage-management.mdtypes-constants.md

connection-session.mddocs/

0

# Connection and Session Management

1

2

Low-level AMQP protocol management including connection establishment, session management, and link creation for advanced messaging scenarios that require fine-grained control over AMQP protocol behavior.

3

4

## Capabilities

5

6

### Connection Management

7

8

Low-level AMQP connection management that handles the network connection, authentication, and protocol handshake.

9

10

```python { .api }

11

class Connection:

12

def __init__(self, hostname, sasl=None, container_id=None, max_frame_size=None,

13

channel_max=None, idle_timeout=None, properties=None,

14

remote_idle_timeout_empty_frame_send_ratio=None, debug=False,

15

encoding='UTF-8'):

16

"""

17

AMQP connection management.

18

19

Parameters:

20

- hostname (str): AMQP broker hostname

21

- sasl (AMQPAuth): SASL authentication mechanism

22

- container_id (str): AMQP container identifier

23

- max_frame_size (int): Maximum frame size in bytes

24

- channel_max (int): Maximum number of channels/sessions

25

- idle_timeout (int): Connection idle timeout in milliseconds

26

- properties (dict): Connection properties

27

- remote_idle_timeout_empty_frame_send_ratio (float): Empty frame ratio

28

- debug (bool): Enable protocol debug logging

29

- encoding (str): Character encoding

30

"""

31

```

32

33

**Key Methods:**

34

35

```python { .api }

36

def open(self):

37

"""Open the AMQP connection."""

38

39

def close(self):

40

"""Close the AMQP connection."""

41

42

def work(self):

43

"""Process connection work (I/O and protocol handling)."""

44

45

def sleep(self, seconds):

46

"""Sleep while continuing to service the connection."""

47

48

def destroy(self):

49

"""Destroy the connection and free resources."""

50

```

51

52

**Key Properties:**

53

54

```python { .api }

55

@property

56

def container_id: str

57

"""AMQP container identifier."""

58

59

@property

60

def hostname: str

61

"""Broker hostname."""

62

63

@property

64

def max_frame_size: int

65

"""Maximum frame size in bytes."""

66

67

@property

68

def remote_max_frame_size: int

69

"""Remote peer's maximum frame size."""

70

71

@property

72

def channel_max: int

73

"""Maximum number of channels."""

74

75

@property

76

def idle_timeout: int

77

"""Idle timeout in milliseconds."""

78

```

79

80

**Usage Examples:**

81

82

```python

83

from uamqp import Connection

84

from uamqp.authentication import SASLPlain

85

86

# Basic connection

87

auth = SASLPlain("amqp.example.com", "user", "password")

88

connection = Connection("amqp.example.com", sasl=auth)

89

90

try:

91

connection.open()

92

print(f"Connected to {connection.hostname}")

93

94

# Connection is ready for session creation

95

# ... create sessions and links

96

97

# Keep connection alive

98

connection.work()

99

100

finally:

101

connection.close()

102

103

# Connection with custom properties

104

properties = {

105

'product': 'MyApp',

106

'version': '1.0.0',

107

'platform': 'Python'

108

}

109

110

connection = Connection(

111

hostname="amqp.example.com",

112

sasl=auth,

113

container_id="my-app-container",

114

max_frame_size=65536,

115

channel_max=100,

116

idle_timeout=60000, # 60 seconds

117

properties=properties,

118

debug=True

119

)

120

```

121

122

### Session Management

123

124

AMQP session management that provides logical grouping and flow control for links within a connection.

125

126

```python { .api }

127

class Session:

128

def __init__(self, connection, incoming_window=None, outgoing_window=None,

129

handle_max=None):

130

"""

131

AMQP session management.

132

133

Parameters:

134

- connection (Connection): AMQP connection

135

- incoming_window (int): Incoming transfer window size

136

- outgoing_window (int): Outgoing transfer window size

137

- handle_max (int): Maximum link handles

138

"""

139

```

140

141

**Key Methods:**

142

143

```python { .api }

144

def begin(self):

145

"""Begin the AMQP session."""

146

147

def end(self):

148

"""End the AMQP session."""

149

150

def destroy(self):

151

"""Destroy the session and free resources."""

152

```

153

154

**Key Properties:**

155

156

```python { .api }

157

@property

158

def incoming_window: int

159

"""Incoming transfer window size."""

160

161

@property

162

def outgoing_window: int

163

"""Outgoing transfer window size."""

164

165

@property

166

def handle_max: int

167

"""Maximum number of link handles."""

168

169

@property

170

def connection: Connection

171

"""Associated AMQP connection."""

172

```

173

174

**Usage Examples:**

175

176

```python

177

from uamqp import Connection, Session

178

179

# Create session on connection

180

connection = Connection("amqp.example.com", sasl=auth)

181

connection.open()

182

183

try:

184

# Create session with flow control

185

session = Session(

186

connection=connection,

187

incoming_window=1000, # Allow 1000 incoming transfers

188

outgoing_window=1000, # Allow 1000 outgoing transfers

189

handle_max=64 # Support up to 64 concurrent links

190

)

191

192

session.begin()

193

print("Session started")

194

195

# Session is ready for link creation

196

# ... create senders and receivers

197

198

finally:

199

session.end()

200

connection.close()

201

202

# Multiple sessions on one connection

203

session1 = Session(connection, incoming_window=500)

204

session2 = Session(connection, incoming_window=500)

205

206

session1.begin()

207

session2.begin()

208

209

# Use sessions for different purposes

210

# session1 for sending, session2 for receiving

211

```

212

213

### Advanced Connection Patterns

214

215

#### Connection Pooling

216

217

```python

218

import threading

219

from queue import Queue

220

221

class ConnectionPool:

222

def __init__(self, hostname, auth, pool_size=5):

223

self.hostname = hostname

224

self.auth = auth

225

self.pool_size = pool_size

226

self.connections = Queue(maxsize=pool_size)

227

self.lock = threading.Lock()

228

229

# Pre-create connections

230

for _ in range(pool_size):

231

conn = Connection(hostname, sasl=auth)

232

conn.open()

233

self.connections.put(conn)

234

235

def get_connection(self):

236

"""Get a connection from the pool."""

237

return self.connections.get()

238

239

def return_connection(self, connection):

240

"""Return a connection to the pool."""

241

if not self.connections.full():

242

self.connections.put(connection)

243

244

def close_all(self):

245

"""Close all connections in the pool."""

246

while not self.connections.empty():

247

conn = self.connections.get()

248

conn.close()

249

250

# Usage

251

pool = ConnectionPool("amqp.example.com", auth, pool_size=10)

252

253

# Get connection from pool

254

connection = pool.get_connection()

255

try:

256

session = Session(connection)

257

session.begin()

258

# Use session...

259

session.end()

260

finally:

261

pool.return_connection(connection)

262

```

263

264

#### Connection Monitoring

265

266

```python

267

import time

268

import threading

269

270

class ConnectionMonitor:

271

def __init__(self, connection, check_interval=30):

272

self.connection = connection

273

self.check_interval = check_interval

274

self.running = False

275

self.monitor_thread = None

276

277

def start_monitoring(self):

278

"""Start connection health monitoring."""

279

self.running = True

280

self.monitor_thread = threading.Thread(target=self._monitor_loop)

281

self.monitor_thread.daemon = True

282

self.monitor_thread.start()

283

284

def stop_monitoring(self):

285

"""Stop connection monitoring."""

286

self.running = False

287

if self.monitor_thread:

288

self.monitor_thread.join()

289

290

def _monitor_loop(self):

291

"""Monitor connection health."""

292

while self.running:

293

try:

294

# Service the connection

295

self.connection.work()

296

297

# Check if connection is still alive

298

if not self._is_connection_alive():

299

print("Connection lost, attempting reconnect...")

300

self._reconnect()

301

302

time.sleep(self.check_interval)

303

304

except Exception as e:

305

print(f"Monitor error: {e}")

306

time.sleep(1)

307

308

def _is_connection_alive(self):

309

"""Check if connection is still alive."""

310

try:

311

# Simple check - could be enhanced

312

return True # Placeholder

313

except:

314

return False

315

316

def _reconnect(self):

317

"""Attempt to reconnect."""

318

try:

319

self.connection.close()

320

self.connection.open()

321

print("Reconnection successful")

322

except Exception as e:

323

print(f"Reconnection failed: {e}")

324

325

# Usage

326

monitor = ConnectionMonitor(connection, check_interval=10)

327

monitor.start_monitoring()

328

329

# Use connection...

330

331

monitor.stop_monitoring()

332

```

333

334

### Session Flow Control

335

336

#### Credit-Based Flow Control

337

338

```python

339

from uamqp import Session, MessageReceiver

340

341

def create_controlled_receiver(session, source, credits=10):

342

"""Create receiver with manual credit management."""

343

344

receiver = MessageReceiver(session, source)

345

receiver.open()

346

347

# Set initial credits

348

receiver.flow(credits)

349

350

return receiver

351

352

def process_with_flow_control(session, source):

353

"""Process messages with explicit flow control."""

354

355

receiver = create_controlled_receiver(session, source, credits=5)

356

357

try:

358

processed = 0

359

while processed < 100: # Process 100 messages

360

messages = receiver.receive_message_batch(5)

361

362

for message in messages:

363

# Process message

364

print(f"Processing: {message.get_data()}")

365

message.accept()

366

processed += 1

367

368

# Replenish credits after processing batch

369

if len(messages) > 0:

370

receiver.flow(len(messages))

371

372

finally:

373

receiver.close()

374

```

375

376

#### Window Management

377

378

```python

379

def create_windowed_session(connection, window_size=1000):

380

"""Create session with specific window size."""

381

382

session = Session(

383

connection=connection,

384

incoming_window=window_size,

385

outgoing_window=window_size

386

)

387

388

session.begin()

389

return session

390

391

def monitor_session_windows(session):

392

"""Monitor session window utilization."""

393

394

while True:

395

incoming_used = session.incoming_window - session.available_incoming

396

outgoing_used = session.outgoing_window - session.available_outgoing

397

398

print(f"Incoming window: {incoming_used}/{session.incoming_window}")

399

print(f"Outgoing window: {outgoing_used}/{session.outgoing_window}")

400

401

# Alert if windows are getting full

402

if incoming_used > session.incoming_window * 0.8:

403

print("Warning: Incoming window nearly full")

404

405

if outgoing_used > session.outgoing_window * 0.8:

406

print("Warning: Outgoing window nearly full")

407

408

time.sleep(5)

409

```

410

411

### Connection Recovery

412

413

#### Automatic Reconnection

414

415

```python

416

import time

417

from uamqp.errors import AMQPConnectionError

418

419

class ReliableConnection:

420

def __init__(self, hostname, auth, max_retries=5):

421

self.hostname = hostname

422

self.auth = auth

423

self.max_retries = max_retries

424

self.connection = None

425

426

def connect(self):

427

"""Connect with automatic retry."""

428

for attempt in range(self.max_retries):

429

try:

430

self.connection = Connection(self.hostname, sasl=self.auth)

431

self.connection.open()

432

print(f"Connected on attempt {attempt + 1}")

433

return self.connection

434

435

except AMQPConnectionError as e:

436

print(f"Connection attempt {attempt + 1} failed: {e}")

437

if attempt < self.max_retries - 1:

438

delay = 2 ** attempt # Exponential backoff

439

time.sleep(delay)

440

else:

441

raise

442

443

def ensure_connected(self):

444

"""Ensure connection is active, reconnect if needed."""

445

if not self.connection or not self._is_connected():

446

print("Connection lost, reconnecting...")

447

return self.connect()

448

return self.connection

449

450

def _is_connected(self):

451

"""Check if connection is still active."""

452

try:

453

# Simple connectivity check

454

self.connection.work()

455

return True

456

except:

457

return False

458

459

# Usage

460

reliable_conn = ReliableConnection("amqp.example.com", auth)

461

connection = reliable_conn.connect()

462

463

# Ensure connection before creating session

464

connection = reliable_conn.ensure_connected()

465

session = Session(connection)

466

```

467

468

## Performance Optimization

469

470

### Connection Tuning

471

472

```python

473

# High-throughput connection configuration

474

connection = Connection(

475

hostname="amqp.example.com",

476

sasl=auth,

477

max_frame_size=65536, # Maximum frame size

478

channel_max=1000, # Many concurrent sessions

479

idle_timeout=300000, # 5 minute timeout

480

debug=False # Disable debug for performance

481

)

482

483

# Low-latency connection configuration

484

connection = Connection(

485

hostname="amqp.example.com",

486

sasl=auth,

487

max_frame_size=4096, # Smaller frames for faster processing

488

channel_max=10, # Fewer sessions

489

idle_timeout=30000, # 30 second timeout

490

debug=False

491

)

492

```

493

494

### Session Optimization

495

496

```python

497

# High-throughput session

498

session = Session(

499

connection=connection,

500

incoming_window=10000, # Large window for batching

501

outgoing_window=10000,

502

handle_max=100 # Many concurrent links

503

)

504

505

# Low-latency session

506

session = Session(

507

connection=connection,

508

incoming_window=1, # Minimal window for immediate processing

509

outgoing_window=1,

510

handle_max=10 # Fewer links

511

)

512

```

513

514

## Thread Safety

515

516

Connection and session objects are not thread-safe. Use proper synchronization when accessing from multiple threads:

517

518

```python

519

import threading

520

521

# Thread-safe connection wrapper

522

class ThreadSafeConnection:

523

def __init__(self, connection):

524

self.connection = connection

525

self.lock = threading.RLock()

526

527

def work(self):

528

with self.lock:

529

return self.connection.work()

530

531

def create_session(self, **kwargs):

532

with self.lock:

533

return Session(self.connection, **kwargs)

534

535

# Usage

536

safe_conn = ThreadSafeConnection(connection)

537

```