0
# Messaging and Integration
1
2
Azure messaging and integration services provide reliable message queuing, event distribution, and enterprise integration capabilities. This includes Service Bus for enterprise messaging and Event Grid for event-driven architectures.
3
4
## Capabilities
5
6
### Azure Service Bus
7
8
Provides reliable cloud messaging as a service and simple hybrid integration. Supports queues for point-to-point communication and topics/subscriptions for publish-subscribe patterns.
9
10
```python { .api }
11
class ServiceBusService:
12
"""
13
Client for Azure Service Bus operations.
14
15
Parameters:
16
- service_namespace: str, Service Bus namespace name (optional)
17
- account_key: str, Service Bus access key (optional)
18
- issuer: str, Service Bus issuer (optional)
19
- host_base: str, Service Bus host base URL (optional)
20
- timeout: int, Request timeout in seconds (optional)
21
"""
22
def __init__(self, service_namespace=None, account_key=None, issuer=None,
23
host_base=None, timeout=60, **kwargs): ...
24
25
# Queue operations
26
def create_queue(self, queue_name: str, queue=None, fail_on_exist=False): ...
27
def delete_queue(self, queue_name: str, fail_not_exist=False): ...
28
def get_queue(self, queue_name: str): ...
29
def list_queues(self): ...
30
def send_queue_message(self, queue_name: str, message): ...
31
def peek_lock_queue_message(self, queue_name: str, timeout=60): ...
32
def receive_queue_message(self, queue_name: str, peek_lock=True, timeout=60): ...
33
def delete_queue_message(self, queue_name: str, message): ...
34
def unlock_queue_message(self, queue_name: str, message): ...
35
def renew_lock_queue_message(self, queue_name: str, message): ...
36
37
# Topic operations
38
def create_topic(self, topic_name: str, topic=None, fail_on_exist=False): ...
39
def delete_topic(self, topic_name: str, fail_not_exist=False): ...
40
def get_topic(self, topic_name: str): ...
41
def list_topics(self): ...
42
def send_topic_message(self, topic_name: str, message): ...
43
44
# Subscription operations
45
def create_subscription(self, topic_name: str, subscription_name: str,
46
subscription=None, fail_on_exist=False): ...
47
def delete_subscription(self, topic_name: str, subscription_name: str,
48
fail_not_exist=False): ...
49
def get_subscription(self, topic_name: str, subscription_name: str): ...
50
def list_subscriptions(self, topic_name: str): ...
51
def receive_subscription_message(self, topic_name: str, subscription_name: str,
52
peek_lock=True, timeout=60): ...
53
def delete_subscription_message(self, topic_name: str, subscription_name: str, message): ...
54
def unlock_subscription_message(self, topic_name: str, subscription_name: str, message): ...
55
def renew_lock_subscription_message(self, topic_name: str, subscription_name: str, message): ...
56
57
# Rule operations
58
def create_rule(self, topic_name: str, subscription_name: str, rule_name: str,
59
rule=None, fail_on_exist=False): ...
60
def delete_rule(self, topic_name: str, subscription_name: str, rule_name: str,
61
fail_not_exist=False): ...
62
def get_rule(self, topic_name: str, subscription_name: str, rule_name: str): ...
63
def list_rules(self, topic_name: str, subscription_name: str): ...
64
```
65
66
### Service Bus Models
67
68
Data models for Service Bus entities and messages.
69
70
```python { .api }
71
class Queue:
72
"""Represents a Service Bus queue."""
73
def __init__(self): ...
74
75
lock_duration: str # Message lock duration
76
max_size_in_megabytes: int # Maximum queue size
77
requires_duplicate_detection: bool # Duplicate detection enabled
78
requires_session: bool # Session support enabled
79
default_message_time_to_live: str # Default TTL
80
dead_lettering_on_message_expiration: bool # Dead letter on expiration
81
duplicate_detection_history_time_window: str # Duplicate detection window
82
max_delivery_count: int # Maximum delivery attempts
83
enable_batched_operations: bool # Batched operations enabled
84
size_in_bytes: int # Current size in bytes
85
message_count: int # Current message count
86
87
class Topic:
88
"""Represents a Service Bus topic."""
89
def __init__(self): ...
90
91
default_message_time_to_live: str # Default TTL
92
max_size_in_megabytes: int # Maximum topic size
93
requires_duplicate_detection: bool # Duplicate detection enabled
94
duplicate_detection_history_time_window: str # Duplicate detection window
95
enable_batched_operations: bool # Batched operations enabled
96
size_in_bytes: int # Current size in bytes
97
filtering_messages_before_publishing: bool # Pre-publish filtering
98
99
class Subscription:
100
"""Represents a Service Bus subscription."""
101
def __init__(self): ...
102
103
lock_duration: str # Message lock duration
104
requires_session: bool # Session support enabled
105
default_message_time_to_live: str # Default TTL
106
dead_lettering_on_message_expiration: bool # Dead letter on expiration
107
dead_lettering_on_filter_evaluation_exceptions: bool # Dead letter on filter errors
108
enable_batched_operations: bool # Batched operations enabled
109
max_delivery_count: int # Maximum delivery attempts
110
message_count: int # Current message count
111
112
class Message:
113
"""Represents a Service Bus message."""
114
def __init__(self, body=None, **kwargs): ...
115
116
body: str # Message body
117
content_type: str # Content type
118
correlation_id: str # Correlation identifier
119
delivery_count: int # Delivery attempt count
120
enqueued_time_utc: str # Enqueue timestamp
121
expires_at_utc: str # Expiration timestamp
122
label: str # Message label
123
message_id: str # Message identifier
124
reply_to: str # Reply-to address
125
reply_to_session_id: str # Reply-to session ID
126
scheduled_enqueue_time_utc: str # Scheduled enqueue time
127
session_id: str # Session identifier
128
time_to_live: int # Time to live in seconds
129
to: str # Destination address
130
broker_properties: dict # Broker properties
131
custom_properties: dict # Custom properties
132
133
class Rule:
134
"""Represents a Service Bus subscription rule."""
135
def __init__(self): ...
136
137
action: object # Rule action
138
filter: object # Rule filter
139
name: str # Rule name
140
```
141
142
### Service Bus Exceptions
143
144
Exception types for Service Bus operations.
145
146
```python { .api }
147
class AzureServiceBusPeekLockError(Exception):
148
"""Exception raised for peek-lock operation errors."""
149
pass
150
151
class AzureServiceBusResourceNotFound(Exception):
152
"""Exception raised when a Service Bus resource is not found."""
153
pass
154
```
155
156
### Azure Event Grid
157
158
Provides event routing and delivery service for building event-driven applications. Supports custom topics and system event subscriptions.
159
160
```python { .api }
161
class EventGridClient:
162
"""
163
Client for Azure Event Grid operations.
164
165
Parameters:
166
- credentials: Authentication credentials
167
"""
168
def __init__(self, credentials, **kwargs): ...
169
170
def publish_events(self, topic_hostname: str, events: list, **kwargs):
171
"""
172
Publish events to an Event Grid topic.
173
174
Parameters:
175
- topic_hostname: str, Event Grid topic hostname
176
- events: list, List of events to publish
177
"""
178
```
179
180
### Event Grid Models
181
182
```python { .api }
183
class EventGridEvent:
184
"""Represents an Event Grid event."""
185
def __init__(self, subject: str, event_type: str, data: object, data_version: str, **kwargs): ...
186
187
id: str # Event identifier
188
subject: str # Event subject
189
data: object # Event data
190
event_type: str # Event type
191
event_time: str # Event timestamp
192
data_version: str # Data schema version
193
metadata_version: str # Metadata schema version
194
topic: str # Event topic
195
196
class CloudEvent:
197
"""Represents a CloudEvents specification event."""
198
def __init__(self, source: str, type: str, **kwargs): ...
199
200
id: str # Event identifier
201
source: str # Event source
202
spec_version: str # CloudEvents specification version
203
type: str # Event type
204
data: object # Event data
205
data_content_type: str # Data content type
206
subject: str # Event subject
207
time: str # Event timestamp
208
```
209
210
## Constants and Configuration
211
212
### Service Bus Constants
213
214
```python { .api }
215
# Authentication constants
216
DEFAULT_RULE_NAME: str # Default subscription rule name
217
AZURE_SERVICEBUS_NAMESPACE: str # Environment variable for namespace
218
AZURE_SERVICEBUS_ACCESS_KEY: str # Environment variable for access key
219
AZURE_SERVICEBUS_ISSUER: str # Environment variable for issuer
220
221
# Service endpoints
222
SERVICE_BUS_HOST_BASE: str # Service Bus host base URL
223
224
# Configuration
225
DEFAULT_HTTP_TIMEOUT: int # Default HTTP request timeout
226
```
227
228
## Usage Examples
229
230
### Working with Service Bus Queues
231
232
```python
233
from azure.servicebus import ServiceBusService
234
from azure.servicebus.models import Message
235
236
# Create Service Bus client
237
sbs = ServiceBusService(
238
service_namespace='mynamespace',
239
account_key='mykey'
240
)
241
242
# Create a queue
243
sbs.create_queue('myqueue')
244
245
# Send a message
246
message = Message('Hello, World!')
247
message.custom_properties = {'priority': 'high'}
248
sbs.send_queue_message('myqueue', message)
249
250
# Receive a message
251
message = sbs.receive_queue_message('myqueue', peek_lock=True)
252
if message.body:
253
print(f"Received: {message.body}")
254
print(f"Priority: {message.custom_properties.get('priority')}")
255
256
# Delete the message after processing
257
sbs.delete_queue_message('myqueue', message)
258
259
# List queues
260
queues = sbs.list_queues()
261
for queue in queues:
262
print(f"Queue: {queue.name}, Messages: {queue.message_count}")
263
```
264
265
### Working with Service Bus Topics and Subscriptions
266
267
```python
268
from azure.servicebus import ServiceBusService
269
from azure.servicebus.models import Message, Topic, Subscription, Rule
270
271
# Create topic
272
topic = Topic()
273
topic.max_size_in_megabytes = 5120
274
topic.default_message_time_to_live = 'PT1H' # 1 hour
275
sbs.create_topic('mytopic', topic)
276
277
# Create subscription
278
subscription = Subscription()
279
subscription.lock_duration = 'PT1M' # 1 minute
280
subscription.max_delivery_count = 10
281
sbs.create_subscription('mytopic', 'mysubscription', subscription)
282
283
# Send message to topic
284
message = Message('Topic message')
285
message.label = 'notification'
286
sbs.send_topic_message('mytopic', message)
287
288
# Receive message from subscription
289
message = sbs.receive_subscription_message('mytopic', 'mysubscription', peek_lock=True)
290
if message.body:
291
print(f"Received from subscription: {message.body}")
292
sbs.delete_subscription_message('mytopic', 'mysubscription', message)
293
294
# Create a subscription rule
295
rule = Rule()
296
rule.filter = {'type': 'SqlFilter', 'sqlExpression': "label = 'notification'"}
297
sbs.create_rule('mytopic', 'mysubscription', 'notification-rule', rule)
298
```
299
300
### Working with Event Grid
301
302
```python
303
from azure.eventgrid import EventGridClient
304
from azure.eventgrid.models import EventGridEvent
305
from datetime import datetime
306
307
# Create Event Grid client
308
eg_client = EventGridClient(credentials)
309
310
# Create events
311
events = [
312
EventGridEvent(
313
id='event-1',
314
subject='user/signup',
315
data={'userId': '123', 'email': 'user@example.com'},
316
event_type='UserSignup',
317
event_time=datetime.utcnow().isoformat() + 'Z',
318
data_version='1.0'
319
),
320
EventGridEvent(
321
id='event-2',
322
subject='order/created',
323
data={'orderId': '456', 'amount': 99.99},
324
event_type='OrderCreated',
325
event_time=datetime.utcnow().isoformat() + 'Z',
326
data_version='1.0'
327
)
328
]
329
330
# Publish events
331
topic_hostname = 'mytopic.westus2-1.eventgrid.azure.net'
332
eg_client.publish_events(topic_hostname, events)
333
print(f"Published {len(events)} events to {topic_hostname}")
334
```
335
336
### Error Handling
337
338
```python
339
from azure.servicebus.models import AzureServiceBusResourceNotFound, AzureServiceBusPeekLockError
340
341
try:
342
# Attempt to get a non-existent queue
343
queue = sbs.get_queue('nonexistent-queue')
344
except AzureServiceBusResourceNotFound:
345
print("Queue not found")
346
347
try:
348
# Attempt to receive from empty queue with short timeout
349
message = sbs.receive_queue_message('myqueue', timeout=1)
350
if not message.body:
351
print("No messages available")
352
except AzureServiceBusPeekLockError as e:
353
print(f"Peek-lock error: {e}")
354
```