or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdconsumer.mdindex.mdlegacy.mdmodels.mdpublisher.md

consumer.mddocs/

0

# Event Consumption

1

2

Event consumption in Azure Event Grid Namespaces enables receiving and managing events from subscriptions with pull delivery. The EventGridConsumerClient provides operations for receiving, acknowledging, releasing, rejecting, and renewing locks on events.

3

4

## Capabilities

5

6

### Consumer Client Creation

7

8

Creates a consumer client for receiving events from Event Grid Namespace subscriptions.

9

10

```python { .api }

11

class EventGridConsumerClient:

12

def __init__(

13

self,

14

endpoint: str,

15

credential: Union[AzureKeyCredential, TokenCredential],

16

*,

17

namespace_topic: str,

18

subscription: str,

19

api_version: Optional[str] = None,

20

**kwargs: Any

21

) -> None:

22

"""

23

Create EventGrid consumer client.

24

25

Parameters:

26

- endpoint: Event Grid namespace endpoint URL

27

- credential: Authentication credential

28

- namespace_topic: Topic name to consume from

29

- subscription: Subscription name to consume from

30

- api_version: API version (default: "2024-06-01")

31

"""

32

```

33

34

### Event Reception

35

36

Receives a batch of Cloud Events from the subscription with configurable batch size and wait time.

37

38

```python { .api }

39

def receive(

40

self,

41

*,

42

max_events: Optional[int] = None,

43

max_wait_time: Optional[int] = None,

44

**kwargs: Any

45

) -> List[ReceiveDetails]:

46

"""

47

Receive batch of Cloud Events from subscription.

48

49

Parameters:

50

- max_events: Maximum number of events to receive (1-100, service default applies)

51

- max_wait_time: Maximum wait time in seconds (1-300, service default applies)

52

53

Returns:

54

List[ReceiveDetails]: List of received events with broker properties

55

56

Raises:

57

- HttpResponseError: Server returned error response

58

- ClientAuthenticationError: Authentication failed

59

"""

60

```

61

62

### Event Acknowledgment

63

64

Acknowledges successfully processed events, removing them from the subscription.

65

66

```python { .api }

67

def acknowledge(

68

self,

69

*,

70

lock_tokens: List[str],

71

**kwargs: Any

72

) -> AcknowledgeResult:

73

"""

74

Acknowledge successfully processed events.

75

76

Parameters:

77

- lock_tokens: List of lock tokens from received events

78

79

Returns:

80

AcknowledgeResult: Result with succeeded and failed acknowledgments

81

82

Raises:

83

- HttpResponseError: Server returned error response

84

- ValueError: Invalid or expired lock tokens

85

"""

86

```

87

88

### Event Release

89

90

Releases events back to the subscription for reprocessing, optionally with a delay.

91

92

```python { .api }

93

def release(

94

self,

95

*,

96

lock_tokens: List[str],

97

release_delay: Optional[Union[int, ReleaseDelay]] = None,

98

**kwargs: Any

99

) -> ReleaseResult:

100

"""

101

Release events back to subscription for reprocessing.

102

103

Parameters:

104

- lock_tokens: List of lock tokens from received events

105

- release_delay: Delay before event becomes available (ReleaseDelay enum or seconds as int)

106

107

Returns:

108

ReleaseResult: Result with succeeded and failed releases

109

110

Raises:

111

- HttpResponseError: Server returned error response

112

- ValueError: Invalid lock tokens or release delay

113

"""

114

```

115

116

### Event Rejection

117

118

Rejects events that cannot be processed, typically moving them to dead letter storage.

119

120

```python { .api }

121

def reject(

122

self,

123

*,

124

lock_tokens: List[str],

125

**kwargs: Any

126

) -> RejectResult:

127

"""

128

Reject events that cannot be processed.

129

130

Parameters:

131

- lock_tokens: List of lock tokens from received events

132

133

Returns:

134

RejectResult: Result with succeeded and failed rejections

135

136

Raises:

137

- HttpResponseError: Server returned error response

138

- ValueError: Invalid or expired lock tokens

139

"""

140

```

141

142

### Lock Renewal

143

144

Extends the lock duration on events to continue processing beyond the initial lock timeout.

145

146

```python { .api }

147

def renew_locks(

148

self,

149

*,

150

lock_tokens: List[str],

151

**kwargs: Any

152

) -> RenewLocksResult:

153

"""

154

Renew locks on events to extend processing time.

155

156

Parameters:

157

- lock_tokens: List of lock tokens from received events

158

159

Returns:

160

RenewLocksResult: Result with succeeded and failed lock renewals

161

162

Raises:

163

- HttpResponseError: Server returned error response

164

- ValueError: Invalid or expired lock tokens

165

"""

166

```

167

168

### Low-Level HTTP Operations

169

170

Direct HTTP request handling for advanced scenarios.

171

172

```python { .api }

173

def send_request(

174

self,

175

request: HttpRequest,

176

*,

177

stream: bool = False,

178

**kwargs: Any

179

) -> HttpResponse:

180

"""

181

Send raw HTTP request through the client pipeline.

182

183

Parameters:

184

- request: The HTTP request to send

185

- stream: Whether to stream the response payload

186

187

Returns:

188

HttpResponse: Raw HTTP response

189

"""

190

```

191

192

### Resource Management

193

194

Context manager support and explicit resource cleanup.

195

196

```python { .api }

197

def close(self) -> None:

198

"""Close the client and cleanup resources."""

199

200

def __enter__(self) -> Self:

201

"""Context manager entry."""

202

203

def __exit__(self, *exc_details: Any) -> None:

204

"""Context manager exit with cleanup."""

205

```

206

207

## Usage Examples

208

209

### Basic Event Consumption

210

211

```python

212

from azure.eventgrid import EventGridConsumerClient

213

from azure.core.credentials import AzureKeyCredential

214

215

# Create consumer client

216

consumer = EventGridConsumerClient(

217

endpoint="https://my-namespace.westus-1.eventgrid.azure.net",

218

credential=AzureKeyCredential("access_key"),

219

namespace_topic="orders-topic",

220

subscription="order-processor"

221

)

222

223

# Receive events

224

events = consumer.receive(max_events=10, max_wait_time=60)

225

226

for event_detail in events:

227

cloud_event = event_detail.event

228

broker_props = event_detail.broker_properties

229

230

print(f"Event ID: {cloud_event.id}")

231

print(f"Event Type: {cloud_event.type}")

232

print(f"Event Source: {cloud_event.source}")

233

print(f"Event Data: {cloud_event.data}")

234

print(f"Lock Token: {broker_props.lock_token}")

235

print(f"Delivery Count: {broker_props.delivery_count}")

236

237

consumer.close()

238

```

239

240

### Event Processing with Acknowledgment

241

242

```python

243

# Process and acknowledge events

244

events = consumer.receive(max_events=5)

245

processed_tokens = []

246

failed_tokens = []

247

248

for event_detail in events:

249

try:

250

# Process the event

251

result = process_order_event(event_detail.event)

252

if result.success:

253

processed_tokens.append(event_detail.broker_properties.lock_token)

254

else:

255

failed_tokens.append(event_detail.broker_properties.lock_token)

256

except Exception as e:

257

print(f"Processing failed: {e}")

258

failed_tokens.append(event_detail.broker_properties.lock_token)

259

260

# Acknowledge successfully processed events

261

if processed_tokens:

262

ack_result = consumer.acknowledge(lock_tokens=processed_tokens)

263

print(f"Acknowledged: {len(ack_result.succeeded_lock_tokens)}")

264

print(f"Failed to acknowledge: {len(ack_result.failed_lock_tokens)}")

265

266

# Release failed events for retry

267

if failed_tokens:

268

release_result = consumer.release(

269

lock_tokens=failed_tokens,

270

release_delay=ReleaseDelay.TEN_SECONDS

271

)

272

print(f"Released: {len(release_result.succeeded_lock_tokens)}")

273

```

274

275

### Error Handling and Retries

276

277

```python

278

from azure.eventgrid.models import ReleaseDelay

279

from azure.core.exceptions import HttpResponseError

280

281

events = consumer.receive(max_events=10)

282

283

for event_detail in events:

284

cloud_event = event_detail.event

285

lock_token = event_detail.broker_properties.lock_token

286

delivery_count = event_detail.broker_properties.delivery_count

287

288

try:

289

# Attempt processing

290

process_event(cloud_event)

291

292

# Acknowledge successful processing

293

consumer.acknowledge(lock_tokens=[lock_token])

294

295

except ProcessingError as e:

296

if delivery_count < 3:

297

# Retry with delay

298

consumer.release(

299

lock_tokens=[lock_token],

300

release_delay=ReleaseDelay.ONE_MINUTE

301

)

302

print(f"Released for retry (attempt {delivery_count + 1})")

303

else:

304

# Max retries exceeded, reject to dead letter

305

consumer.reject(lock_tokens=[lock_token])

306

print(f"Rejected after {delivery_count} attempts")

307

308

except Exception as e:

309

# Unexpected error, reject immediately

310

consumer.reject(lock_tokens=[lock_token])

311

print(f"Rejected due to unexpected error: {e}")

312

```

313

314

### Long-Running Processing with Lock Renewal

315

316

```python

317

import time

318

from azure.eventgrid.models import ReleaseDelay

319

320

events = consumer.receive(max_events=1)

321

322

for event_detail in events:

323

lock_token = event_detail.broker_properties.lock_token

324

325

try:

326

# Start long-running processing

327

for step in range(10): # Simulate 10 processing steps

328

# Renew lock every 30 seconds to prevent timeout

329

if step > 0 and step % 3 == 0:

330

renew_result = consumer.renew_locks(lock_tokens=[lock_token])

331

if not renew_result.succeeded_lock_tokens:

332

print("Failed to renew lock, releasing event")

333

consumer.release(lock_tokens=[lock_token])

334

break

335

336

# Process step (simulate 10 seconds per step)

337

time.sleep(10)

338

print(f"Completed step {step + 1}")

339

else:

340

# All steps completed successfully

341

consumer.acknowledge(lock_tokens=[lock_token])

342

print("Processing completed successfully")

343

344

except Exception as e:

345

# Processing failed, release for retry

346

consumer.release(

347

lock_tokens=[lock_token],

348

release_delay=ReleaseDelay.TEN_MINUTES

349

)

350

print(f"Processing failed, released for retry: {e}")

351

```

352

353

### Context Manager Usage

354

355

```python

356

# Automatic resource cleanup

357

with EventGridConsumerClient(

358

endpoint="https://namespace.region.eventgrid.azure.net",

359

credential=AzureKeyCredential("key"),

360

namespace_topic="topic",

361

subscription="sub"

362

) as consumer:

363

364

events = consumer.receive(max_events=5)

365

366

# Process events

367

for event_detail in events:

368

process_event(event_detail.event)

369

370

# Acknowledge all events

371

lock_tokens = [e.broker_properties.lock_token for e in events]

372

consumer.acknowledge(lock_tokens=lock_tokens)

373

374

# Client automatically closed on exit

375

```

376

377

### Batch Processing Pattern

378

379

```python

380

def process_event_batch(consumer, batch_size=10):

381

"""Process events in batches with proper error handling."""

382

383

events = consumer.receive(max_events=batch_size, max_wait_time=30)

384

385

if not events:

386

print("No events received")

387

return

388

389

print(f"Received {len(events)} events")

390

391

# Group events by processing outcome

392

success_tokens = []

393

retry_tokens = []

394

reject_tokens = []

395

396

for event_detail in events:

397

try:

398

# Process individual event

399

result = process_single_event(event_detail.event)

400

401

if result == "success":

402

success_tokens.append(event_detail.broker_properties.lock_token)

403

elif result == "retry":

404

retry_tokens.append(event_detail.broker_properties.lock_token)

405

else:

406

reject_tokens.append(event_detail.broker_properties.lock_token)

407

408

except Exception as e:

409

print(f"Unexpected error processing event: {e}")

410

reject_tokens.append(event_detail.broker_properties.lock_token)

411

412

# Handle each group of events

413

if success_tokens:

414

ack_result = consumer.acknowledge(lock_tokens=success_tokens)

415

print(f"Acknowledged {len(ack_result.succeeded_lock_tokens)} events")

416

417

if retry_tokens:

418

release_result = consumer.release(

419

lock_tokens=retry_tokens,

420

release_delay=ReleaseDelay.ONE_MINUTE

421

)

422

print(f"Released {len(release_result.succeeded_lock_tokens)} events for retry")

423

424

if reject_tokens:

425

reject_result = consumer.reject(lock_tokens=reject_tokens)

426

print(f"Rejected {len(reject_result.succeeded_lock_tokens)} events")

427

428

# Usage

429

with EventGridConsumerClient(...) as consumer:

430

while True:

431

process_event_batch(consumer)

432

time.sleep(5) # Brief pause between batches

433

```

434

435

## Error Handling

436

437

### Common Error Scenarios

438

439

```python

440

from azure.core.exceptions import HttpResponseError, ClientAuthenticationError

441

442

try:

443

events = consumer.receive(max_events=10)

444

# Process events...

445

consumer.acknowledge(lock_tokens=lock_tokens)

446

447

except ClientAuthenticationError as e:

448

print(f"Authentication failed: {e}")

449

# Check credentials and permissions

450

451

except HttpResponseError as e:

452

if e.status_code == 404:

453

print("Topic or subscription not found")

454

elif e.status_code == 400:

455

print(f"Bad request: {e.message}")

456

# Check lock tokens and parameters

457

elif e.status_code == 409:

458

print("Lock tokens expired or invalid")

459

# Events may have been processed by another consumer

460

else:

461

print(f"HTTP error {e.status_code}: {e.message}")

462

463

except ValueError as e:

464

print(f"Invalid parameters: {e}")

465

# Check lock tokens format and values

466

```

467

468

### Operation Result Handling

469

470

```python # Handle partial success in batch operations

471

ack_result = consumer.acknowledge(lock_tokens=all_lock_tokens)

472

473

# Check for successful acknowledgments

474

if ack_result.succeeded_lock_tokens:

475

print(f"Successfully acknowledged {len(ack_result.succeeded_lock_tokens)} events")

476

477

# Handle failed acknowledgments

478

if ack_result.failed_lock_tokens:

479

print(f"Failed to acknowledge {len(ack_result.failed_lock_tokens)} events")

480

for failed_token in ack_result.failed_lock_tokens:

481

print(f"Failed token: {failed_token.lock_token}")

482

print(f"Error: {failed_token.error}")

483

484

# Decide whether to retry or reject based on error

485

if "expired" in str(failed_token.error):

486

print("Lock expired, event likely processed elsewhere")

487

else:

488

# Unexpected error, may need manual intervention

489

print("Unexpected acknowledgment error")

490

```