or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdconsumer.mdindex.mdlegacy.mdmodels.mdpublisher.md

async-operations.mddocs/

0

# Async Operations

1

2

Azure Event Grid Python SDK provides full asynchronous support for both publisher and consumer operations. The async clients have identical APIs to their synchronous counterparts but use `async`/`await` for non-blocking operations.

3

4

## Capabilities

5

6

### Async Publisher Client

7

8

Asynchronous event publishing for high-throughput scenarios and non-blocking operations.

9

10

```python { .api }

11

class EventGridPublisherClient:

12

def __init__(

13

self,

14

endpoint: str,

15

credential: Union[AzureKeyCredential, AzureSasCredential, AsyncTokenCredential],

16

*,

17

namespace_topic: Optional[str] = None,

18

api_version: Optional[str] = None,

19

**kwargs: Any

20

) -> None: ...

21

22

async def send(

23

self,

24

events: Union[CloudEvent, EventGridEvent, dict, List[Union[CloudEvent, EventGridEvent, dict]]],

25

*,

26

channel_name: Optional[str] = None,

27

content_type: Optional[str] = None,

28

**kwargs: Any

29

) -> None:

30

"""

31

Asynchronously send events to Event Grid.

32

33

Parameters:

34

- events: Single event or list of events to send

35

- channel_name: Channel name for multi-channel publishing (namespaces only)

36

- content_type: Override content type for the request

37

"""

38

39

async def send_request(

40

self,

41

request: HttpRequest,

42

*,

43

stream: bool = False,

44

**kwargs: Any

45

) -> AsyncHttpResponse: ...

46

47

async def close(self) -> None:

48

"""Asynchronously close the client and cleanup resources."""

49

50

async def __aenter__(self) -> Self: ...

51

async def __aexit__(self, *exc_details: Any) -> None: ...

52

```

53

54

### Async Consumer Client

55

56

Asynchronous event consumption with all management operations for Event Grid Namespaces.

57

58

```python { .api }

59

class EventGridConsumerClient:

60

def __init__(

61

self,

62

endpoint: str,

63

credential: Union[AzureKeyCredential, AsyncTokenCredential],

64

*,

65

namespace_topic: str,

66

subscription: str,

67

api_version: Optional[str] = None,

68

**kwargs: Any

69

) -> None: ...

70

71

async def receive(

72

self,

73

*,

74

max_events: Optional[int] = None,

75

max_wait_time: Optional[int] = None,

76

**kwargs: Any

77

) -> List[ReceiveDetails]:

78

"""

79

Asynchronously receive batch of Cloud Events from subscription.

80

81

Parameters:

82

- max_events: Maximum number of events to receive

83

- max_wait_time: Maximum wait time in seconds

84

85

Returns:

86

List[ReceiveDetails]: List of received events with broker properties

87

"""

88

89

async def acknowledge(

90

self,

91

*,

92

lock_tokens: List[str],

93

**kwargs: Any

94

) -> AcknowledgeResult:

95

"""Asynchronously acknowledge successfully processed events."""

96

97

async def release(

98

self,

99

*,

100

lock_tokens: List[str],

101

release_delay: Optional[Union[str, ReleaseDelay]] = None,

102

**kwargs: Any

103

) -> ReleaseResult:

104

"""Asynchronously release events back to subscription for reprocessing."""

105

106

async def reject(

107

self,

108

*,

109

lock_tokens: List[str],

110

**kwargs: Any

111

) -> RejectResult:

112

"""Asynchronously reject events that cannot be processed."""

113

114

async def renew_locks(

115

self,

116

*,

117

lock_tokens: List[str],

118

**kwargs: Any

119

) -> RenewLocksResult:

120

"""Asynchronously renew locks on events to extend processing time."""

121

122

async def send_request(

123

self,

124

request: HttpRequest,

125

*,

126

stream: bool = False,

127

**kwargs: Any

128

) -> AsyncHttpResponse: ...

129

130

async def close(self) -> None: ...

131

async def __aenter__(self) -> Self: ...

132

async def __aexit__(self, *exc_details: Any) -> None: ...

133

```

134

135

## Usage Examples

136

137

### Async Event Publishing

138

139

```python

140

import asyncio

141

from azure.eventgrid.aio import EventGridPublisherClient

142

from azure.core.credentials import AzureKeyCredential

143

from azure.core.messaging import CloudEvent

144

145

async def publish_events():

146

"""Asynchronously publish events to Event Grid Namespace."""

147

148

# Create async publisher client

149

async with EventGridPublisherClient(

150

endpoint="https://my-namespace.westus-1.eventgrid.azure.net",

151

credential=AzureKeyCredential("access_key"),

152

namespace_topic="orders-topic"

153

) as publisher:

154

155

# Create events

156

events = [

157

CloudEvent(

158

source="orders-service",

159

type="Order.Created",

160

data={"order_id": f"order-{i}", "total": i * 10.99}

161

)

162

for i in range(1, 6)

163

]

164

165

# Send events asynchronously

166

await publisher.send(events)

167

print(f"Published {len(events)} events")

168

169

# Run the async function

170

asyncio.run(publish_events())

171

```

172

173

### Async Event Consumption

174

175

```python

176

import asyncio

177

from azure.eventgrid.aio import EventGridConsumerClient

178

from azure.core.credentials import AzureKeyCredential

179

from azure.eventgrid.models import ReleaseDelay

180

181

async def consume_events():

182

"""Asynchronously consume and process events."""

183

184

async with EventGridConsumerClient(

185

endpoint="https://my-namespace.westus-1.eventgrid.azure.net",

186

credential=AzureKeyCredential("access_key"),

187

namespace_topic="orders-topic",

188

subscription="order-processor"

189

) as consumer:

190

191

# Receive events

192

events = await consumer.receive(max_events=10, max_wait_time=30)

193

194

if not events:

195

print("No events received")

196

return

197

198

print(f"Received {len(events)} events")

199

200

# Process events concurrently

201

tasks = []

202

for event_detail in events:

203

task = process_event_async(event_detail)

204

tasks.append(task)

205

206

# Wait for all processing to complete

207

results = await asyncio.gather(*tasks, return_exceptions=True)

208

209

# Group results by outcome

210

success_tokens = []

211

failed_tokens = []

212

213

for i, result in enumerate(results):

214

lock_token = events[i].broker_properties.lock_token

215

216

if isinstance(result, Exception):

217

print(f"Processing failed: {result}")

218

failed_tokens.append(lock_token)

219

elif result:

220

success_tokens.append(lock_token)

221

else:

222

failed_tokens.append(lock_token)

223

224

# Acknowledge successful events

225

if success_tokens:

226

ack_result = await consumer.acknowledge(lock_tokens=success_tokens)

227

print(f"Acknowledged {len(ack_result.succeeded_lock_tokens)} events")

228

229

# Release failed events

230

if failed_tokens:

231

release_result = await consumer.release(

232

lock_tokens=failed_tokens,

233

release_delay=ReleaseDelay.ONE_MINUTE

234

)

235

print(f"Released {len(release_result.succeeded_lock_tokens)} events")

236

237

async def process_event_async(event_detail):

238

"""Simulate async event processing."""

239

try:

240

# Simulate async work

241

await asyncio.sleep(0.1)

242

243

# Process the event

244

cloud_event = event_detail.event

245

print(f"Processing event: {cloud_event.type} from {cloud_event.source}")

246

247

# Simulate processing logic

248

if "error" in str(cloud_event.data).lower():

249

return False # Processing failed

250

251

return True # Processing succeeded

252

253

except Exception as e:

254

print(f"Unexpected error: {e}")

255

return False

256

257

# Run the consumer

258

asyncio.run(consume_events())

259

```

260

261

### Concurrent Publishing

262

263

```python

264

import asyncio

265

from azure.eventgrid.aio import EventGridPublisherClient

266

from azure.core.credentials import AzureKeyCredential

267

from azure.core.messaging import CloudEvent

268

269

async def publish_to_multiple_topics():

270

"""Publish events to multiple topics concurrently."""

271

272

credential = AzureKeyCredential("access_key")

273

endpoint = "https://my-namespace.westus-1.eventgrid.azure.net"

274

275

# Create multiple publisher clients

276

publishers = [

277

EventGridPublisherClient(endpoint, credential, namespace_topic="topic1"),

278

EventGridPublisherClient(endpoint, credential, namespace_topic="topic2"),

279

EventGridPublisherClient(endpoint, credential, namespace_topic="topic3")

280

]

281

282

try:

283

# Create events for each topic

284

events_per_topic = [

285

[CloudEvent(source="app", type="Type1", data={"topic": "topic1", "id": i}) for i in range(5)],

286

[CloudEvent(source="app", type="Type2", data={"topic": "topic2", "id": i}) for i in range(5)],

287

[CloudEvent(source="app", type="Type3", data={"topic": "topic3", "id": i}) for i in range(5)]

288

]

289

290

# Publish to all topics concurrently

291

tasks = []

292

for publisher, events in zip(publishers, events_per_topic):

293

task = publisher.send(events)

294

tasks.append(task)

295

296

# Wait for all publishing to complete

297

await asyncio.gather(*tasks)

298

print("Published events to all topics")

299

300

finally:

301

# Close all publishers

302

close_tasks = [publisher.close() for publisher in publishers]

303

await asyncio.gather(*close_tasks)

304

305

asyncio.run(publish_to_multiple_topics())

306

```

307

308

### Event Processing Pipeline

309

310

```python

311

import asyncio

312

from azure.eventgrid.aio import EventGridConsumerClient, EventGridPublisherClient

313

from azure.core.credentials import AzureKeyCredential

314

from azure.core.messaging import CloudEvent

315

316

class EventProcessor:

317

"""Async event processing pipeline."""

318

319

def __init__(self, endpoint, credential):

320

self.endpoint = endpoint

321

self.credential = credential

322

323

async def run_pipeline(self):

324

"""Run continuous event processing pipeline."""

325

326

# Input consumer

327

input_consumer = EventGridConsumerClient(

328

self.endpoint, self.credential,

329

namespace_topic="input-topic",

330

subscription="processor"

331

)

332

333

# Output publisher

334

output_publisher = EventGridPublisherClient(

335

self.endpoint, self.credential,

336

namespace_topic="output-topic"

337

)

338

339

async with input_consumer, output_publisher:

340

while True:

341

try:

342

# Receive events

343

events = await input_consumer.receive(max_events=5, max_wait_time=10)

344

345

if not events:

346

await asyncio.sleep(1)

347

continue

348

349

# Process events concurrently

350

processed_events = await self.process_events(events)

351

352

# Publish processed events

353

if processed_events:

354

await output_publisher.send(processed_events)

355

356

# Acknowledge input events

357

lock_tokens = [e.broker_properties.lock_token for e in events]

358

await input_consumer.acknowledge(lock_tokens=lock_tokens)

359

360

print(f"Processed {len(events)} events")

361

362

except KeyboardInterrupt:

363

print("Shutting down pipeline...")

364

break

365

except Exception as e:

366

print(f"Pipeline error: {e}")

367

await asyncio.sleep(5) # Brief pause before retry

368

369

async def process_events(self, events):

370

"""Process events concurrently."""

371

tasks = [self.transform_event(event_detail) for event_detail in events]

372

results = await asyncio.gather(*tasks, return_exceptions=True)

373

374

# Filter out failed transformations

375

processed_events = []

376

for result in results:

377

if not isinstance(result, Exception) and result:

378

processed_events.append(result)

379

380

return processed_events

381

382

async def transform_event(self, event_detail):

383

"""Transform a single event."""

384

try:

385

# Simulate async transformation

386

await asyncio.sleep(0.1)

387

388

input_event = event_detail.event

389

390

# Create transformed event

391

output_event = CloudEvent(

392

source="event-processor",

393

type=f"Processed.{input_event.type}",

394

data={

395

"original_data": input_event.data,

396

"processed_at": asyncio.get_event_loop().time(),

397

"processor_id": "async-processor-1"

398

}

399

)

400

401

return output_event

402

403

except Exception as e:

404

print(f"Transformation failed: {e}")

405

return None

406

407

# Run the pipeline

408

processor = EventProcessor(

409

endpoint="https://my-namespace.westus-1.eventgrid.azure.net",

410

credential=AzureKeyCredential("access_key")

411

)

412

413

asyncio.run(processor.run_pipeline())

414

```

415

416

### Async Context Manager Patterns

417

418

```python

419

import asyncio

420

from azure.eventgrid.aio import EventGridPublisherClient, EventGridConsumerClient

421

422

async def context_manager_examples():

423

"""Demonstrate async context manager usage patterns."""

424

425

credential = AzureKeyCredential("access_key")

426

endpoint = "https://my-namespace.westus-1.eventgrid.azure.net"

427

428

# Single client context manager

429

async with EventGridPublisherClient(

430

endpoint, credential, namespace_topic="topic"

431

) as publisher:

432

await publisher.send([CloudEvent(source="app", type="Test", data={})])

433

434

# Multiple clients with AsyncExitStack

435

from contextlib import AsyncExitStack

436

437

async with AsyncExitStack() as stack:

438

# Enter multiple async context managers

439

publisher = await stack.enter_async_context(

440

EventGridPublisherClient(endpoint, credential, namespace_topic="output")

441

)

442

consumer = await stack.enter_async_context(

443

EventGridConsumerClient(

444

endpoint, credential,

445

namespace_topic="input",

446

subscription="processor"

447

)

448

)

449

450

# Use both clients

451

events = await consumer.receive(max_events=5)

452

if events:

453

# Process and forward events

454

processed = [transform_event(e) for e in events]

455

await publisher.send(processed)

456

457

# Acknowledge input events

458

tokens = [e.broker_properties.lock_token for e in events]

459

await consumer.acknowledge(lock_tokens=tokens)

460

461

# All clients automatically closed on exit

462

463

def transform_event(event_detail):

464

"""Simple event transformation."""

465

return CloudEvent(

466

source="transformer",

467

type=f"Transformed.{event_detail.event.type}",

468

data={"original": event_detail.event.data}

469

)

470

471

asyncio.run(context_manager_examples())

472

```

473

474

### Error Handling with Async Operations

475

476

```python

477

import asyncio

478

from azure.core.exceptions import HttpResponseError, ClientAuthenticationError

479

480

async def robust_async_processing():

481

"""Demonstrate robust error handling in async operations."""

482

483

async with EventGridConsumerClient(...) as consumer:

484

while True:

485

try:

486

# Receive events with timeout

487

events = await asyncio.wait_for(

488

consumer.receive(max_events=10),

489

timeout=30.0

490

)

491

492

if not events:

493

continue

494

495

# Process with individual error handling

496

success_tokens = []

497

retry_tokens = []

498

499

for event_detail in events:

500

try:

501

# Process with timeout

502

result = await asyncio.wait_for(

503

process_event_async(event_detail.event),

504

timeout=5.0

505

)

506

507

if result:

508

success_tokens.append(event_detail.broker_properties.lock_token)

509

else:

510

retry_tokens.append(event_detail.broker_properties.lock_token)

511

512

except asyncio.TimeoutError:

513

print("Event processing timed out")

514

retry_tokens.append(event_detail.broker_properties.lock_token)

515

516

except Exception as e:

517

print(f"Event processing failed: {e}")

518

retry_tokens.append(event_detail.broker_properties.lock_token)

519

520

# Handle results concurrently

521

tasks = []

522

523

if success_tokens:

524

tasks.append(consumer.acknowledge(lock_tokens=success_tokens))

525

526

if retry_tokens:

527

tasks.append(consumer.release(

528

lock_tokens=retry_tokens,

529

release_delay=ReleaseDelay.ONE_MINUTE

530

))

531

532

if tasks:

533

results = await asyncio.gather(*tasks, return_exceptions=True)

534

for result in results:

535

if isinstance(result, Exception):

536

print(f"Operation failed: {result}")

537

538

except asyncio.TimeoutError:

539

print("Receive operation timed out, continuing...")

540

541

except ClientAuthenticationError as e:

542

print(f"Authentication failed: {e}")

543

break # Cannot continue without valid auth

544

545

except HttpResponseError as e:

546

print(f"HTTP error: {e.status_code} - {e.message}")

547

if e.status_code >= 500:

548

# Server error, wait and retry

549

await asyncio.sleep(5)

550

else:

551

# Client error, may need intervention

552

break

553

554

except Exception as e:

555

print(f"Unexpected error: {e}")

556

await asyncio.sleep(1)

557

558

asyncio.run(robust_async_processing())

559

```

560

561

## Performance Considerations

562

563

### Concurrency Control

564

565

```python

566

import asyncio

567

from asyncio import Semaphore

568

569

async def controlled_concurrent_processing(consumer, max_concurrent=10):

570

"""Process events with controlled concurrency."""

571

572

semaphore = Semaphore(max_concurrent)

573

574

async def process_with_semaphore(event_detail):

575

async with semaphore:

576

return await process_event_async(event_detail)

577

578

events = await consumer.receive(max_events=50)

579

580

# Process with limited concurrency

581

tasks = [process_with_semaphore(event) for event in events]

582

results = await asyncio.gather(*tasks, return_exceptions=True)

583

584

# Handle results...

585

```

586

587

### Batch Processing Optimization

588

589

```python

590

async def optimized_batch_processing(consumer, batch_size=20):

591

"""Optimized async batch processing."""

592

593

while True:

594

# Receive larger batches

595

events = await consumer.receive(max_events=batch_size, max_wait_time=10)

596

597

if not events:

598

await asyncio.sleep(0.1) # Brief pause

599

continue

600

601

# Process in smaller concurrent chunks

602

chunk_size = 5

603

for i in range(0, len(events), chunk_size):

604

chunk = events[i:i + chunk_size]

605

606

# Process chunk concurrently

607

tasks = [process_event_async(event) for event in chunk]

608

results = await asyncio.gather(*tasks, return_exceptions=True)

609

610

# Handle chunk results immediately

611

success_tokens = []

612

failed_tokens = []

613

614

for j, result in enumerate(results):

615

lock_token = chunk[j].broker_properties.lock_token

616

if isinstance(result, Exception) or not result:

617

failed_tokens.append(lock_token)

618

else:

619

success_tokens.append(lock_token)

620

621

# Process results concurrently

622

ops = []

623

if success_tokens:

624

ops.append(consumer.acknowledge(lock_tokens=success_tokens))

625

if failed_tokens:

626

ops.append(consumer.release(lock_tokens=failed_tokens))

627

628

if ops:

629

await asyncio.gather(*ops, return_exceptions=True)

630

```