or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

administrative-operations.mdclient-management.mdconstants-enums.mdexception-handling.mdindex.mdmessage-operations.mdmessage-types.mdsession-management.md

session-management.mddocs/

0

# Session Management

1

2

Stateful messaging through sessions enabling ordered processing, session state management, and advanced messaging patterns for applications requiring message correlation and ordered delivery.

3

4

## Capabilities

5

6

### Session-Enabled Receivers

7

8

Create receivers that work with sessions for stateful message processing.

9

10

```python { .api }

11

# From ServiceBusClient

12

def get_queue_receiver(

13

self,

14

queue_name: str,

15

*,

16

session_id: Optional[Union[str, NextAvailableSessionType]] = None,

17

**kwargs

18

) -> ServiceBusReceiver:

19

"""

20

Create a session-enabled queue receiver.

21

22

Parameters:

23

- queue_name: Name of the session-enabled queue

24

- session_id: Specific session ID or NEXT_AVAILABLE_SESSION for any available session

25

"""

26

27

def get_subscription_receiver(

28

self,

29

topic_name: str,

30

subscription_name: str,

31

*,

32

session_id: Optional[Union[str, NextAvailableSessionType]] = None,

33

**kwargs

34

) -> ServiceBusReceiver:

35

"""

36

Create a session-enabled subscription receiver.

37

38

Parameters:

39

- topic_name: Name of the topic

40

- subscription_name: Name of the session-enabled subscription

41

- session_id: Specific session ID or NEXT_AVAILABLE_SESSION for any available session

42

"""

43

```

44

45

#### Usage Example

46

47

```python

48

from azure.servicebus import ServiceBusClient, NEXT_AVAILABLE_SESSION

49

50

client = ServiceBusClient.from_connection_string("your_connection_string")

51

52

# Connect to a specific session

53

with client.get_queue_receiver("my-session-queue", session_id="session-123") as receiver:

54

session = receiver.session

55

print(f"Connected to session: {session.session_id}")

56

57

messages = receiver.receive_messages(max_message_count=10)

58

for message in messages:

59

print(f"Session message: {message.body}")

60

receiver.complete_message(message)

61

62

# Connect to next available session

63

with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:

64

if receiver.session:

65

print(f"Connected to available session: {receiver.session.session_id}")

66

# Process session messages

67

```

68

69

### ServiceBusSession Operations

70

71

Manage session state and properties for stateful messaging scenarios.

72

73

```python { .api }

74

class ServiceBusSession:

75

@property

76

def session_id(self) -> str:

77

"""

78

The unique identifier of the session.

79

80

Returns:

81

Session ID string

82

"""

83

84

@property

85

def locked_until_utc(self) -> Optional[datetime]:

86

"""

87

The time when the session lock expires.

88

89

Returns:

90

UTC datetime when session lock expires, or None if lock has expired

91

"""

92

93

@property

94

def auto_renew_error(self) -> Optional[Union[AutoLockRenewFailed, AutoLockRenewTimeout]]:

95

"""

96

Error information if auto-renewal of session lock failed.

97

98

Returns:

99

Exception instance if auto-renewal failed, otherwise None

100

"""

101

102

def get_state(

103

self,

104

*,

105

timeout: Optional[float] = None,

106

**kwargs

107

) -> bytes:

108

"""

109

Get the session state data.

110

111

Parameters:

112

- timeout: Operation timeout in seconds

113

114

Returns:

115

Session state as bytes

116

117

Raises:

118

- SessionLockLostError: If session lock has expired

119

- ServiceBusError: For other Service Bus related errors

120

"""

121

122

def set_state(

123

self,

124

state: Optional[Union[str, bytes, bytearray]],

125

*,

126

timeout: Optional[float] = None,

127

**kwargs

128

) -> None:

129

"""

130

Set the session state data.

131

132

Parameters:

133

- state: Session state data (str, bytes, bytearray, or None to clear)

134

- timeout: Operation timeout in seconds

135

136

Raises:

137

- SessionLockLostError: If session lock has expired

138

- ServiceBusError: For other Service Bus related errors

139

"""

140

141

def renew_lock(

142

self,

143

*,

144

timeout: Optional[float] = None,

145

**kwargs

146

) -> datetime:

147

"""

148

Renew the session lock.

149

150

Parameters:

151

- timeout: Operation timeout in seconds

152

153

Returns:

154

New session lock expiration time

155

156

Raises:

157

- SessionLockLostError: If session lock has expired

158

- ServiceBusError: For other Service Bus related errors

159

"""

160

```

161

162

#### Usage Example

163

164

```python

165

from azure.servicebus import ServiceBusClient, ServiceBusMessage

166

import json

167

168

client = ServiceBusClient.from_connection_string("your_connection_string")

169

170

# Sending session messages

171

with client.get_queue_sender("my-session-queue") as sender:

172

# All messages with the same session_id will be delivered in order

173

for i in range(5):

174

message = ServiceBusMessage(

175

f"Order step {i}",

176

session_id="order-12345"

177

)

178

sender.send_messages(message)

179

180

# Processing session messages with state management

181

with client.get_queue_receiver("my-session-queue", session_id="order-12345") as receiver:

182

session = receiver.session

183

184

# Get current session state

185

try:

186

state_data = session.get_state()

187

if state_data:

188

order_state = json.loads(state_data.decode())

189

print(f"Current order state: {order_state}")

190

else:

191

order_state = {"processed_steps": [], "status": "pending"}

192

except Exception:

193

order_state = {"processed_steps": [], "status": "pending"}

194

195

# Process messages in order

196

messages = receiver.receive_messages(max_message_count=10, max_wait_time=30)

197

for message in messages:

198

print(f"Processing: {message.body}")

199

200

# Update state

201

order_state["processed_steps"].append(message.body)

202

if len(order_state["processed_steps"]) >= 5:

203

order_state["status"] = "completed"

204

205

# Save updated state

206

session.set_state(json.dumps(order_state).encode())

207

208

# Complete the message

209

receiver.complete_message(message)

210

211

print(f"Final order state: {order_state}")

212

```

213

214

### Automatic Lock Renewal

215

216

Use AutoLockRenewer to automatically maintain session locks during long-running processing.

217

218

```python { .api }

219

class AutoLockRenewer:

220

def __init__(

221

self,

222

max_lock_renewal_duration: float = 300,

223

on_lock_renew_failure: Optional[LockRenewFailureCallback] = None,

224

executor: Optional[ThreadPoolExecutor] = None,

225

max_workers: Optional[int] = None

226

):

227

"""

228

Initialize AutoLockRenewer for automatic lock renewal.

229

230

Parameters:

231

- max_lock_renewal_duration: Maximum time to renew locks (seconds)

232

- on_lock_renew_failure: Callback function for renewal failures

233

- executor: Custom thread pool executor

234

- max_workers: Maximum number of worker threads

235

"""

236

237

def register(

238

self,

239

renewable: Union[ServiceBusSession, ServiceBusReceivedMessage],

240

timeout: float = 300

241

) -> None:

242

"""

243

Register a session or message for automatic lock renewal.

244

245

Parameters:

246

- renewable: ServiceBusSession or ServiceBusReceivedMessage to auto-renew

247

- timeout: Maximum renewal duration in seconds

248

249

Raises:

250

- ValueError: If renewable is already registered or invalid

251

"""

252

253

def close(self) -> None:

254

"""

255

Stop all auto-renewal and cleanup resources.

256

"""

257

```

258

259

#### Usage Example

260

261

```python

262

from azure.servicebus import ServiceBusClient, AutoLockRenewer

263

import time

264

265

def on_lock_renew_failure(renewable, error):

266

print(f"Lock renewal failed for {renewable}: {error}")

267

268

client = ServiceBusClient.from_connection_string("your_connection_string")

269

auto_renewer = AutoLockRenewer(

270

max_lock_renewal_duration=600, # 10 minutes

271

on_lock_renew_failure=on_lock_renew_failure

272

)

273

274

with client.get_queue_receiver("my-session-queue", session_id="long-session") as receiver:

275

session = receiver.session

276

277

# Register session for auto-renewal

278

auto_renewer.register(session, timeout=600)

279

280

try:

281

# Long-running processing

282

messages = receiver.receive_messages(max_message_count=100, max_wait_time=60)

283

284

for message in messages:

285

# Register message for auto-renewal during processing

286

auto_renewer.register(message, timeout=300)

287

288

try:

289

# Simulate long processing time

290

print(f"Processing message: {message.body}")

291

time.sleep(30) # Long processing

292

293

receiver.complete_message(message)

294

print("Message completed successfully")

295

296

except Exception as e:

297

print(f"Error processing message: {e}")

298

receiver.abandon_message(message)

299

300

finally:

301

auto_renewer.close()

302

```

303

304

### Session Filtering

305

306

Constants and enums for session management.

307

308

```python { .api }

309

class ServiceBusSessionFilter(Enum):

310

"""Filter for selecting sessions."""

311

NEXT_AVAILABLE = 0

312

313

# Constant for convenience

314

NEXT_AVAILABLE_SESSION: ServiceBusSessionFilter = ServiceBusSessionFilter.NEXT_AVAILABLE

315

```

316

317

### Session Error Handling

318

319

Specific exceptions related to session operations.

320

321

```python { .api }

322

class SessionLockLostError(ServiceBusError):

323

"""

324

The session lock has expired.

325

326

All unsettled messages that have been received can no longer be settled.

327

The session must be re-acquired to continue processing.

328

"""

329

330

class SessionCannotBeLockedError(ServiceBusError):

331

"""

332

The requested session cannot be locked.

333

334

This typically occurs when:

335

- The session is already locked by another client

336

- The session does not exist

337

- The session ID is invalid

338

"""

339

```

340

341

#### Usage Example

342

343

```python

344

from azure.servicebus import (

345

ServiceBusClient,

346

SessionLockLostError,

347

SessionCannotBeLockedError,

348

NEXT_AVAILABLE_SESSION

349

)

350

351

client = ServiceBusClient.from_connection_string("your_connection_string")

352

353

try:

354

with client.get_queue_receiver("my-session-queue", session_id="busy-session") as receiver:

355

session = receiver.session

356

messages = receiver.receive_messages()

357

358

for message in messages:

359

try:

360

# Process message

361

process_message_with_session_state(message, session)

362

receiver.complete_message(message)

363

364

except SessionLockLostError:

365

print("Session lock lost - session must be re-acquired")

366

break

367

368

except SessionCannotBeLockedError:

369

print("Session is locked by another client, trying next available session")

370

371

# Try to get any available session instead

372

with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:

373

if receiver.session:

374

print(f"Got available session: {receiver.session.session_id}")

375

# Process messages from this session

376

```

377

378

## Asynchronous Session Management

379

380

For asynchronous operations, use the async versions from the `azure.servicebus.aio` module.

381

382

```python { .api }

383

from azure.servicebus.aio import ServiceBusClient, ServiceBusReceiver, ServiceBusSession

384

385

class ServiceBusSession:

386

# Same properties as sync version

387

@property

388

def session_id(self) -> str: ...

389

@property

390

def locked_until_utc(self) -> Optional[datetime]: ...

391

@property

392

def auto_renew_error(self) -> Optional[Union[AutoLockRenewFailed, AutoLockRenewTimeout]]: ...

393

394

# Async methods

395

async def get_state(self, *, timeout: Optional[float] = None, **kwargs) -> bytes: ...

396

async def set_state(self, state, *, timeout: Optional[float] = None, **kwargs) -> None: ...

397

async def renew_lock(self, *, timeout: Optional[float] = None, **kwargs) -> datetime: ...

398

```

399

400

#### Usage Example

401

402

```python

403

import asyncio

404

import json

405

from azure.servicebus.aio import ServiceBusClient

406

from azure.servicebus import ServiceBusMessage, NEXT_AVAILABLE_SESSION

407

408

async def process_session_messages():

409

async with ServiceBusClient.from_connection_string("your_connection_string") as client:

410

async with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:

411

session = receiver.session

412

if not session:

413

print("No available sessions")

414

return

415

416

print(f"Processing session: {session.session_id}")

417

418

# Get session state

419

state_data = await session.get_state()

420

session_state = json.loads(state_data.decode()) if state_data else {"count": 0}

421

422

# Process messages

423

async for message in receiver:

424

print(f"Received: {message.body}")

425

426

# Update session state

427

session_state["count"] += 1

428

await session.set_state(json.dumps(session_state).encode())

429

430

# Complete message

431

await receiver.complete_message(message)

432

433

# Break after 10 messages

434

if session_state["count"] >= 10:

435

break

436

437

asyncio.run(process_session_messages())

438

```