or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compute-services.mdindex.mdmessaging-integration.mdmonitoring-analytics.mdplatform-services.mdresource-management.mdsecurity-identity.md

messaging-integration.mddocs/

0

# Messaging and Integration

1

2

Azure messaging and integration services provide reliable message queuing, event distribution, and enterprise integration capabilities. This includes Service Bus for enterprise messaging and Event Grid for event-driven architectures.

3

4

## Capabilities

5

6

### Azure Service Bus

7

8

Provides reliable cloud messaging as a service and simple hybrid integration. Supports queues for point-to-point communication and topics/subscriptions for publish-subscribe patterns.

9

10

```python { .api }

11

class ServiceBusService:

12

"""

13

Client for Azure Service Bus operations.

14

15

Parameters:

16

- service_namespace: str, Service Bus namespace name (optional)

17

- account_key: str, Service Bus access key (optional)

18

- issuer: str, Service Bus issuer (optional)

19

- host_base: str, Service Bus host base URL (optional)

20

- timeout: int, Request timeout in seconds (optional)

21

"""

22

def __init__(self, service_namespace=None, account_key=None, issuer=None,

23

host_base=None, timeout=60, **kwargs): ...

24

25

# Queue operations

26

def create_queue(self, queue_name: str, queue=None, fail_on_exist=False): ...

27

def delete_queue(self, queue_name: str, fail_not_exist=False): ...

28

def get_queue(self, queue_name: str): ...

29

def list_queues(self): ...

30

def send_queue_message(self, queue_name: str, message): ...

31

def peek_lock_queue_message(self, queue_name: str, timeout=60): ...

32

def receive_queue_message(self, queue_name: str, peek_lock=True, timeout=60): ...

33

def delete_queue_message(self, queue_name: str, message): ...

34

def unlock_queue_message(self, queue_name: str, message): ...

35

def renew_lock_queue_message(self, queue_name: str, message): ...

36

37

# Topic operations

38

def create_topic(self, topic_name: str, topic=None, fail_on_exist=False): ...

39

def delete_topic(self, topic_name: str, fail_not_exist=False): ...

40

def get_topic(self, topic_name: str): ...

41

def list_topics(self): ...

42

def send_topic_message(self, topic_name: str, message): ...

43

44

# Subscription operations

45

def create_subscription(self, topic_name: str, subscription_name: str,

46

subscription=None, fail_on_exist=False): ...

47

def delete_subscription(self, topic_name: str, subscription_name: str,

48

fail_not_exist=False): ...

49

def get_subscription(self, topic_name: str, subscription_name: str): ...

50

def list_subscriptions(self, topic_name: str): ...

51

def receive_subscription_message(self, topic_name: str, subscription_name: str,

52

peek_lock=True, timeout=60): ...

53

def delete_subscription_message(self, topic_name: str, subscription_name: str, message): ...

54

def unlock_subscription_message(self, topic_name: str, subscription_name: str, message): ...

55

def renew_lock_subscription_message(self, topic_name: str, subscription_name: str, message): ...

56

57

# Rule operations

58

def create_rule(self, topic_name: str, subscription_name: str, rule_name: str,

59

rule=None, fail_on_exist=False): ...

60

def delete_rule(self, topic_name: str, subscription_name: str, rule_name: str,

61

fail_not_exist=False): ...

62

def get_rule(self, topic_name: str, subscription_name: str, rule_name: str): ...

63

def list_rules(self, topic_name: str, subscription_name: str): ...

64

```

65

66

### Service Bus Models

67

68

Data models for Service Bus entities and messages.

69

70

```python { .api }

71

class Queue:

72

"""Represents a Service Bus queue."""

73

def __init__(self): ...

74

75

lock_duration: str # Message lock duration

76

max_size_in_megabytes: int # Maximum queue size

77

requires_duplicate_detection: bool # Duplicate detection enabled

78

requires_session: bool # Session support enabled

79

default_message_time_to_live: str # Default TTL

80

dead_lettering_on_message_expiration: bool # Dead letter on expiration

81

duplicate_detection_history_time_window: str # Duplicate detection window

82

max_delivery_count: int # Maximum delivery attempts

83

enable_batched_operations: bool # Batched operations enabled

84

size_in_bytes: int # Current size in bytes

85

message_count: int # Current message count

86

87

class Topic:

88

"""Represents a Service Bus topic."""

89

def __init__(self): ...

90

91

default_message_time_to_live: str # Default TTL

92

max_size_in_megabytes: int # Maximum topic size

93

requires_duplicate_detection: bool # Duplicate detection enabled

94

duplicate_detection_history_time_window: str # Duplicate detection window

95

enable_batched_operations: bool # Batched operations enabled

96

size_in_bytes: int # Current size in bytes

97

filtering_messages_before_publishing: bool # Pre-publish filtering

98

99

class Subscription:

100

"""Represents a Service Bus subscription."""

101

def __init__(self): ...

102

103

lock_duration: str # Message lock duration

104

requires_session: bool # Session support enabled

105

default_message_time_to_live: str # Default TTL

106

dead_lettering_on_message_expiration: bool # Dead letter on expiration

107

dead_lettering_on_filter_evaluation_exceptions: bool # Dead letter on filter errors

108

enable_batched_operations: bool # Batched operations enabled

109

max_delivery_count: int # Maximum delivery attempts

110

message_count: int # Current message count

111

112

class Message:

113

"""Represents a Service Bus message."""

114

def __init__(self, body=None, **kwargs): ...

115

116

body: str # Message body

117

content_type: str # Content type

118

correlation_id: str # Correlation identifier

119

delivery_count: int # Delivery attempt count

120

enqueued_time_utc: str # Enqueue timestamp

121

expires_at_utc: str # Expiration timestamp

122

label: str # Message label

123

message_id: str # Message identifier

124

reply_to: str # Reply-to address

125

reply_to_session_id: str # Reply-to session ID

126

scheduled_enqueue_time_utc: str # Scheduled enqueue time

127

session_id: str # Session identifier

128

time_to_live: int # Time to live in seconds

129

to: str # Destination address

130

broker_properties: dict # Broker properties

131

custom_properties: dict # Custom properties

132

133

class Rule:

134

"""Represents a Service Bus subscription rule."""

135

def __init__(self): ...

136

137

action: object # Rule action

138

filter: object # Rule filter

139

name: str # Rule name

140

```

141

142

### Service Bus Exceptions

143

144

Exception types for Service Bus operations.

145

146

```python { .api }

147

class AzureServiceBusPeekLockError(Exception):

148

"""Exception raised for peek-lock operation errors."""

149

pass

150

151

class AzureServiceBusResourceNotFound(Exception):

152

"""Exception raised when a Service Bus resource is not found."""

153

pass

154

```

155

156

### Azure Event Grid

157

158

Provides event routing and delivery service for building event-driven applications. Supports custom topics and system event subscriptions.

159

160

```python { .api }

161

class EventGridClient:

162

"""

163

Client for Azure Event Grid operations.

164

165

Parameters:

166

- credentials: Authentication credentials

167

"""

168

def __init__(self, credentials, **kwargs): ...

169

170

def publish_events(self, topic_hostname: str, events: list, **kwargs):

171

"""

172

Publish events to an Event Grid topic.

173

174

Parameters:

175

- topic_hostname: str, Event Grid topic hostname

176

- events: list, List of events to publish

177

"""

178

```

179

180

### Event Grid Models

181

182

```python { .api }

183

class EventGridEvent:

184

"""Represents an Event Grid event."""

185

def __init__(self, subject: str, event_type: str, data: object, data_version: str, **kwargs): ...

186

187

id: str # Event identifier

188

subject: str # Event subject

189

data: object # Event data

190

event_type: str # Event type

191

event_time: str # Event timestamp

192

data_version: str # Data schema version

193

metadata_version: str # Metadata schema version

194

topic: str # Event topic

195

196

class CloudEvent:

197

"""Represents a CloudEvents specification event."""

198

def __init__(self, source: str, type: str, **kwargs): ...

199

200

id: str # Event identifier

201

source: str # Event source

202

spec_version: str # CloudEvents specification version

203

type: str # Event type

204

data: object # Event data

205

data_content_type: str # Data content type

206

subject: str # Event subject

207

time: str # Event timestamp

208

```

209

210

## Constants and Configuration

211

212

### Service Bus Constants

213

214

```python { .api }

215

# Authentication constants

216

DEFAULT_RULE_NAME: str # Default subscription rule name

217

AZURE_SERVICEBUS_NAMESPACE: str # Environment variable for namespace

218

AZURE_SERVICEBUS_ACCESS_KEY: str # Environment variable for access key

219

AZURE_SERVICEBUS_ISSUER: str # Environment variable for issuer

220

221

# Service endpoints

222

SERVICE_BUS_HOST_BASE: str # Service Bus host base URL

223

224

# Configuration

225

DEFAULT_HTTP_TIMEOUT: int # Default HTTP request timeout

226

```

227

228

## Usage Examples

229

230

### Working with Service Bus Queues

231

232

```python

233

from azure.servicebus import ServiceBusService

234

from azure.servicebus.models import Message

235

236

# Create Service Bus client

237

sbs = ServiceBusService(

238

service_namespace='mynamespace',

239

account_key='mykey'

240

)

241

242

# Create a queue

243

sbs.create_queue('myqueue')

244

245

# Send a message

246

message = Message('Hello, World!')

247

message.custom_properties = {'priority': 'high'}

248

sbs.send_queue_message('myqueue', message)

249

250

# Receive a message

251

message = sbs.receive_queue_message('myqueue', peek_lock=True)

252

if message.body:

253

print(f"Received: {message.body}")

254

print(f"Priority: {message.custom_properties.get('priority')}")

255

256

# Delete the message after processing

257

sbs.delete_queue_message('myqueue', message)

258

259

# List queues

260

queues = sbs.list_queues()

261

for queue in queues:

262

print(f"Queue: {queue.name}, Messages: {queue.message_count}")

263

```

264

265

### Working with Service Bus Topics and Subscriptions

266

267

```python

268

from azure.servicebus import ServiceBusService

269

from azure.servicebus.models import Message, Topic, Subscription, Rule

270

271

# Create topic

272

topic = Topic()

273

topic.max_size_in_megabytes = 5120

274

topic.default_message_time_to_live = 'PT1H' # 1 hour

275

sbs.create_topic('mytopic', topic)

276

277

# Create subscription

278

subscription = Subscription()

279

subscription.lock_duration = 'PT1M' # 1 minute

280

subscription.max_delivery_count = 10

281

sbs.create_subscription('mytopic', 'mysubscription', subscription)

282

283

# Send message to topic

284

message = Message('Topic message')

285

message.label = 'notification'

286

sbs.send_topic_message('mytopic', message)

287

288

# Receive message from subscription

289

message = sbs.receive_subscription_message('mytopic', 'mysubscription', peek_lock=True)

290

if message.body:

291

print(f"Received from subscription: {message.body}")

292

sbs.delete_subscription_message('mytopic', 'mysubscription', message)

293

294

# Create a subscription rule

295

rule = Rule()

296

rule.filter = {'type': 'SqlFilter', 'sqlExpression': "label = 'notification'"}

297

sbs.create_rule('mytopic', 'mysubscription', 'notification-rule', rule)

298

```

299

300

### Working with Event Grid

301

302

```python

303

from azure.eventgrid import EventGridClient

304

from azure.eventgrid.models import EventGridEvent

305

from datetime import datetime

306

307

# Create Event Grid client

308

eg_client = EventGridClient(credentials)

309

310

# Create events

311

events = [

312

EventGridEvent(

313

id='event-1',

314

subject='user/signup',

315

data={'userId': '123', 'email': 'user@example.com'},

316

event_type='UserSignup',

317

event_time=datetime.utcnow().isoformat() + 'Z',

318

data_version='1.0'

319

),

320

EventGridEvent(

321

id='event-2',

322

subject='order/created',

323

data={'orderId': '456', 'amount': 99.99},

324

event_type='OrderCreated',

325

event_time=datetime.utcnow().isoformat() + 'Z',

326

data_version='1.0'

327

)

328

]

329

330

# Publish events

331

topic_hostname = 'mytopic.westus2-1.eventgrid.azure.net'

332

eg_client.publish_events(topic_hostname, events)

333

print(f"Published {len(events)} events to {topic_hostname}")

334

```

335

336

### Error Handling

337

338

```python

339

from azure.servicebus.models import AzureServiceBusResourceNotFound, AzureServiceBusPeekLockError

340

341

try:

342

# Attempt to get a non-existent queue

343

queue = sbs.get_queue('nonexistent-queue')

344

except AzureServiceBusResourceNotFound:

345

print("Queue not found")

346

347

try:

348

# Attempt to receive from empty queue with short timeout

349

message = sbs.receive_queue_message('myqueue', timeout=1)

350

if not message.body:

351

print("No messages available")

352

except AzureServiceBusPeekLockError as e:

353

print(f"Peek-lock error: {e}")

354

```