0
# Amazon SNS/SQS Messaging Services
1
2
Amazon Simple Notification Service (SNS) and Simple Queue Service (SQS) provide comprehensive messaging capabilities for event-driven architectures, enabling pub/sub messaging, message queuing, and asynchronous communication patterns in data pipelines.
3
4
## Capabilities
5
6
### SNS Publication and Notification
7
8
Publish messages to SNS topics for fan-out messaging patterns and multi-subscriber notifications.
9
10
```python { .api }
11
class SnsPublishOperator(AwsBaseOperator):
12
"""
13
Publish a message to Amazon SNS.
14
15
Parameters:
16
- target_arn: str - either a TopicArn or an EndpointArn
17
- message: str - the default message you want to send
18
- subject: str - the message subject you want to send
19
- message_attributes: dict - message attributes as a flat dict
20
- message_deduplication_id: str - unique message deduplication ID (FIFO topics only)
21
- message_group_id: str - message group ID (FIFO topics only)
22
- aws_conn_id: str - Airflow connection for AWS credentials
23
- region_name: str - AWS region name
24
- verify: bool - whether to verify SSL certificates
25
- botocore_config: dict - botocore client configuration
26
27
Returns:
28
str: Message ID from SNS
29
"""
30
def __init__(
31
self,
32
*,
33
target_arn: str,
34
message: str,
35
subject: str = None,
36
message_attributes: dict = None,
37
message_deduplication_id: str = None,
38
message_group_id: str = None,
39
**kwargs
40
): ...
41
```
42
43
### SNS Service Hook
44
45
Low-level SNS operations for topic management and message publishing.
46
47
```python { .api }
48
class SnsHook(AwsBaseHook):
49
"""
50
Hook for Amazon SNS operations.
51
52
Parameters:
53
- aws_conn_id: str - Airflow connection for AWS credentials
54
- region_name: str - AWS region name
55
- verify: bool - whether to verify SSL certificates
56
- botocore_config: dict - botocore client configuration
57
"""
58
def __init__(
59
self,
60
aws_conn_id: str = 'aws_default',
61
region_name: str = None,
62
verify: bool = None,
63
botocore_config: dict = None,
64
**kwargs
65
): ...
66
67
def publish_to_target(
68
self,
69
target_arn: str,
70
message: str,
71
subject: str = None,
72
message_attributes: dict = None,
73
message_deduplication_id: str = None,
74
message_group_id: str = None
75
) -> str:
76
"""
77
Publish a message to an SNS topic or endpoint.
78
79
Parameters:
80
- target_arn: str - TopicArn or EndpointArn
81
- message: str - message content
82
- subject: str - message subject
83
- message_attributes: dict - message attributes
84
- message_deduplication_id: str - deduplication ID for FIFO topics
85
- message_group_id: str - group ID for FIFO topics
86
87
Returns:
88
str: Message ID
89
"""
90
...
91
92
def create_topic(self, name: str, attributes: dict = None, tags: dict = None) -> str:
93
"""
94
Create an SNS topic.
95
96
Parameters:
97
- name: str - topic name
98
- attributes: dict - topic attributes
99
- tags: dict - topic tags
100
101
Returns:
102
str: Topic ARN
103
"""
104
...
105
106
def delete_topic(self, topic_arn: str) -> None:
107
"""Delete an SNS topic."""
108
...
109
110
def list_topics(self, next_token: str = None) -> dict:
111
"""List SNS topics."""
112
...
113
114
def get_topic_attributes(self, topic_arn: str) -> dict:
115
"""Get attributes for an SNS topic."""
116
...
117
```
118
119
### SQS Message Publishing
120
121
Send messages to SQS queues with support for standard and FIFO queues.
122
123
```python { .api }
124
class SqsPublishOperator(AwsBaseOperator):
125
"""
126
Publish a message to an Amazon SQS queue.
127
128
Parameters:
129
- sqs_queue: str - SQS queue URL
130
- message_content: str - message content
131
- message_attributes: dict - additional message attributes
132
- delay_seconds: int - message delay (default: 0)
133
- message_group_id: str - message group ID (FIFO queues only)
134
- message_deduplication_id: str - deduplication ID (FIFO queues only)
135
- aws_conn_id: str - Airflow connection for AWS credentials
136
- region_name: str - AWS region name
137
- verify: bool - whether to verify SSL certificates
138
- botocore_config: dict - botocore client configuration
139
140
Returns:
141
dict: Information about the message sent
142
"""
143
def __init__(
144
self,
145
*,
146
sqs_queue: str,
147
message_content: str,
148
message_attributes: dict = None,
149
delay_seconds: int = 0,
150
message_group_id: str = None,
151
message_deduplication_id: str = None,
152
**kwargs
153
): ...
154
```
155
156
### SQS Message Processing and Sensing
157
158
Monitor SQS queues and process messages with comprehensive polling capabilities.
159
160
```python { .api }
161
class SqsSensor(BaseSensorOperator):
162
"""
163
Poll an Amazon SQS queue and process available messages.
164
165
Parameters:
166
- sqs_queue: str - SQS queue URL
167
- max_messages: int - maximum number of messages to receive (1-10)
168
- num_batches: int - number of batches to process
169
- wait_time_seconds: int - long polling wait time (0-20 seconds)
170
- visibility_timeout_seconds: int - message visibility timeout
171
- message_filtering: str - filtering method for messages
172
- message_filtering_match_values: list - values to match for filtering
173
- message_filtering_config: dict - advanced filtering configuration
174
- delete_message_on_reception: bool - delete message after receiving
175
- aws_conn_id: str - Airflow connection for AWS credentials
176
- region_name: str - AWS region name
177
178
Returns:
179
list: Received messages
180
"""
181
def __init__(
182
self,
183
sqs_queue: str,
184
max_messages: int = 5,
185
num_batches: int = 1,
186
wait_time_seconds: int = 1,
187
visibility_timeout_seconds: int = None,
188
message_filtering: str = None,
189
message_filtering_match_values: list = None,
190
message_filtering_config: dict = None,
191
delete_message_on_reception: bool = True,
192
**kwargs
193
): ...
194
```
195
196
### SQS Service Hook
197
198
Comprehensive SQS operations for queue management and message handling.
199
200
```python { .api }
201
class SqsHook(AwsBaseHook):
202
"""
203
Hook for Amazon SQS operations.
204
205
Parameters:
206
- aws_conn_id: str - Airflow connection for AWS credentials
207
- region_name: str - AWS region name
208
- verify: bool - whether to verify SSL certificates
209
- botocore_config: dict - botocore client configuration
210
"""
211
def __init__(
212
self,
213
aws_conn_id: str = 'aws_default',
214
region_name: str = None,
215
verify: bool = None,
216
botocore_config: dict = None,
217
**kwargs
218
): ...
219
220
def send_message(
221
self,
222
queue_url: str,
223
message_body: str,
224
delay_seconds: int = 0,
225
message_attributes: dict = None,
226
message_group_id: str = None,
227
message_deduplication_id: str = None
228
) -> dict:
229
"""
230
Send a message to an SQS queue.
231
232
Parameters:
233
- queue_url: str - SQS queue URL
234
- message_body: str - message content
235
- delay_seconds: int - delivery delay
236
- message_attributes: dict - message attributes
237
- message_group_id: str - group ID for FIFO queues
238
- message_deduplication_id: str - deduplication ID for FIFO queues
239
240
Returns:
241
dict: Send message response
242
"""
243
...
244
245
def receive_message(
246
self,
247
queue_url: str,
248
max_number_of_messages: int = 1,
249
wait_time_seconds: int = 0,
250
visibility_timeout_seconds: int = None,
251
message_attribute_names: list = None,
252
receive_request_attempt_id: str = None
253
) -> list[dict]:
254
"""
255
Receive messages from an SQS queue.
256
257
Parameters:
258
- queue_url: str - SQS queue URL
259
- max_number_of_messages: int - maximum messages to receive (1-10)
260
- wait_time_seconds: int - long polling wait time (0-20)
261
- visibility_timeout_seconds: int - message visibility timeout
262
- message_attribute_names: list - attributes to retrieve
263
- receive_request_attempt_id: str - deduplication ID for FIFO queues
264
265
Returns:
266
list: Received messages
267
"""
268
...
269
270
def delete_message(self, queue_url: str, receipt_handle: str) -> dict:
271
"""Delete a message from an SQS queue."""
272
...
273
274
def create_queue(
275
self,
276
queue_name: str,
277
attributes: dict = None,
278
tags: dict = None
279
) -> str:
280
"""
281
Create an SQS queue.
282
283
Parameters:
284
- queue_name: str - queue name
285
- attributes: dict - queue attributes
286
- tags: dict - queue tags
287
288
Returns:
289
str: Queue URL
290
"""
291
...
292
293
def delete_queue(self, queue_url: str) -> dict:
294
"""Delete an SQS queue."""
295
...
296
297
def get_queue_attributes(self, queue_url: str, attribute_names: list = None) -> dict:
298
"""Get attributes for an SQS queue."""
299
...
300
301
def purge_queue(self, queue_url: str) -> dict:
302
"""Purge all messages from an SQS queue."""
303
...
304
```
305
306
### Event-Driven Queue Processing
307
308
Process SQS messages with custom handling and automatic message management.
309
310
```python { .api }
311
class SqsExecutor:
312
"""
313
Executor for processing SQS messages in Airflow workflows.
314
315
Parameters:
316
- queue_url: str - SQS queue URL
317
- aws_conn_id: str - Airflow connection for AWS credentials
318
- region_name: str - AWS region name
319
"""
320
def __init__(
321
self,
322
queue_url: str,
323
aws_conn_id: str = 'aws_default',
324
region_name: str = None
325
): ...
326
327
def submit_job(self, job_name: str, job_kwargs: dict) -> str:
328
"""Submit a job message to the SQS queue."""
329
...
330
331
def heartbeat(self) -> None:
332
"""Send heartbeat for long-running operations."""
333
...
334
```
335
336
## Usage Examples
337
338
### Event Publication with SNS
339
340
```python
341
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
342
343
# Publish workflow completion notification
344
notify_completion = SnsPublishOperator(
345
task_id='notify_data_pipeline_complete',
346
target_arn='arn:aws:sns:us-west-2:123456789012:data-pipeline-notifications',
347
subject='Data Pipeline Completed Successfully',
348
message="""
349
Data pipeline execution completed successfully.
350
351
Pipeline: {{ dag.dag_id }}
352
Execution Date: {{ ds }}
353
Duration: {{ (ti.end_date - ti.start_date).total_seconds() }} seconds
354
355
All data processing tasks completed without errors.
356
""",
357
message_attributes={
358
'pipeline_name': {
359
'DataType': 'String',
360
'StringValue': '{{ dag.dag_id }}'
361
},
362
'execution_date': {
363
'DataType': 'String',
364
'StringValue': '{{ ds }}'
365
},
366
'status': {
367
'DataType': 'String',
368
'StringValue': 'SUCCESS'
369
}
370
},
371
aws_conn_id='aws_default'
372
)
373
```
374
375
### Message Queue Processing
376
377
```python
378
from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator
379
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
380
381
# Send processing job to queue
382
queue_job = SqsPublishOperator(
383
task_id='queue_processing_job',
384
sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/data-processing-jobs',
385
message_content='{{ ti.xcom_pull(task_ids="prepare_job_config") | tojson }}',
386
message_attributes={
387
'job_type': {
388
'DataType': 'String',
389
'StringValue': 'batch_processing'
390
},
391
'priority': {
392
'DataType': 'Number',
393
'StringValue': '5'
394
},
395
'source_dag': {
396
'DataType': 'String',
397
'StringValue': '{{ dag.dag_id }}'
398
}
399
},
400
delay_seconds=30, # Delay processing by 30 seconds
401
aws_conn_id='aws_default'
402
)
403
404
# Monitor job completion messages
405
monitor_completion = SqsSensor(
406
task_id='monitor_job_completion',
407
sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/job-completion-notifications',
408
max_messages=1,
409
wait_time_seconds=20, # Long polling
410
message_filtering='jsonpath',
411
message_filtering_match_values=['SUCCESS'],
412
message_filtering_config={
413
'json_path': '$.status',
414
'match_values': ['SUCCESS', 'COMPLETED']
415
},
416
timeout=3600, # 1 hour timeout
417
poke_interval=60, # Check every minute
418
aws_conn_id='aws_default'
419
)
420
421
queue_job >> monitor_completion
422
```
423
424
### FIFO Queue for Ordered Processing
425
426
```python
427
# Send ordered messages to FIFO queue
428
send_ordered_messages = SqsPublishOperator(
429
task_id='send_file_processing_order',
430
sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/file-processing-order.fifo',
431
message_content='{{ ti.xcom_pull(task_ids="list_files") | tojson }}',
432
message_group_id='file-batch-{{ ds_nodash }}', # Group by date
433
message_deduplication_id='file-processing-{{ ds_nodash }}-{{ ti.try_number }}',
434
message_attributes={
435
'batch_date': {
436
'DataType': 'String',
437
'StringValue': '{{ ds }}'
438
},
439
'file_count': {
440
'DataType': 'Number',
441
'StringValue': '{{ ti.xcom_pull(task_ids="count_files") }}'
442
}
443
},
444
aws_conn_id='aws_default'
445
)
446
```
447
448
### Custom Message Processing
449
450
```python
451
from airflow.providers.amazon.aws.hooks.sqs import SqsHook
452
453
def process_sqs_messages(**context):
454
"""Custom function to process SQS messages."""
455
sqs_hook = SqsHook(aws_conn_id='aws_default')
456
457
queue_url = 'https://sqs.us-west-2.amazonaws.com/123456789012/processing-queue'
458
459
# Receive messages
460
messages = sqs_hook.receive_message(
461
queue_url=queue_url,
462
max_number_of_messages=10,
463
wait_time_seconds=20,
464
visibility_timeout_seconds=300
465
)
466
467
processed_count = 0
468
469
for message in messages:
470
try:
471
# Process message body
472
message_body = json.loads(message['Body'])
473
474
# Custom processing logic
475
process_data_file(message_body['file_path'])
476
477
# Delete message after successful processing
478
sqs_hook.delete_message(
479
queue_url=queue_url,
480
receipt_handle=message['ReceiptHandle']
481
)
482
483
processed_count += 1
484
485
except Exception as e:
486
print(f"Error processing message: {e}")
487
# Message will become visible again after visibility timeout
488
489
return f"Processed {processed_count} messages"
490
491
# Use with PythonOperator
492
process_messages = PythonOperator(
493
task_id='process_queue_messages',
494
python_callable=process_sqs_messages
495
)
496
```
497
498
### Multi-Channel Event Broadcasting
499
500
```python
501
# Broadcast event to multiple SNS topics
502
def broadcast_event(**context):
503
"""Broadcast event to multiple notification channels."""
504
from airflow.providers.amazon.aws.hooks.sns import SnsHook
505
506
sns_hook = SnsHook(aws_conn_id='aws_default')
507
508
event_data = context['ti'].xcom_pull(task_ids='generate_event_data')
509
510
# Different topics for different audiences
511
topics = {
512
'engineering': 'arn:aws:sns:us-west-2:123456789012:engineering-alerts',
513
'operations': 'arn:aws:sns:us-west-2:123456789012:ops-notifications',
514
'business': 'arn:aws:sns:us-west-2:123456789012:business-updates'
515
}
516
517
for audience, topic_arn in topics.items():
518
# Customize message for each audience
519
if audience == 'engineering':
520
message = f"Technical Alert: {event_data['technical_details']}"
521
subject = f"ALERT: {event_data['system']}"
522
elif audience == 'operations':
523
message = f"Operational Update: {event_data['summary']}"
524
subject = f"OPS: {event_data['service']}"
525
else: # business
526
message = f"Business Impact: {event_data['business_impact']}"
527
subject = f"Business Update: {event_data['process']}"
528
529
sns_hook.publish_to_target(
530
target_arn=topic_arn,
531
message=message,
532
subject=subject,
533
message_attributes={
534
'event_type': {
535
'DataType': 'String',
536
'StringValue': event_data['type']
537
},
538
'severity': {
539
'DataType': 'String',
540
'StringValue': event_data['severity']
541
}
542
}
543
)
544
545
return "Event broadcasted to all channels"
546
547
broadcast_task = PythonOperator(
548
task_id='broadcast_event',
549
python_callable=broadcast_event
550
)
551
```
552
553
### Dead Letter Queue Handling
554
555
```python
556
# Monitor dead letter queue for failed messages
557
monitor_dlq = SqsSensor(
558
task_id='monitor_dead_letter_queue',
559
sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/processing-dlq',
560
max_messages=5,
561
wait_time_seconds=10,
562
delete_message_on_reception=False, # Keep for investigation
563
timeout=300, # 5 minute check
564
poke_interval=60,
565
mode='reschedule', # Don't block worker
566
aws_conn_id='aws_default'
567
)
568
569
def handle_failed_messages(**context):
570
"""Handle messages in dead letter queue."""
571
messages = context['ti'].xcom_pull(task_ids='monitor_dead_letter_queue')
572
573
if messages:
574
# Alert on failed messages
575
alert_sns = SnsPublishOperator(
576
task_id='alert_failed_messages',
577
target_arn='arn:aws:sns:us-west-2:123456789012:critical-alerts',
578
subject='Dead Letter Queue Alert',
579
message=f'Found {len(messages)} failed messages requiring investigation',
580
message_attributes={
581
'alert_type': {
582
'DataType': 'String',
583
'StringValue': 'DLQ_ALERT'
584
},
585
'message_count': {
586
'DataType': 'Number',
587
'StringValue': str(len(messages))
588
}
589
}
590
)
591
592
return alert_sns.execute(context)
593
594
return "No failed messages found"
595
596
handle_dlq = PythonOperator(
597
task_id='handle_dead_letter_messages',
598
python_callable=handle_failed_messages
599
)
600
601
monitor_dlq >> handle_dlq
602
```
603
604
## Import Statements
605
606
```python
607
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
608
from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator
609
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
610
from airflow.providers.amazon.aws.hooks.sns import SnsHook
611
from airflow.providers.amazon.aws.hooks.sqs import SqsHook
612
from airflow.providers.amazon.aws.notifications.sns import SnsNotifier
613
from airflow.providers.amazon.aws.notifications.sqs import SqsNotifier
614
```