or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-analytics.mdauthentication.mdbatch-processing.mddata-transfers.mddms-migration.mddynamodb-nosql.mdecs-containers.mdeks-kubernetes.mdemr-clusters.mdglue-processing.mdindex.mdlambda-functions.mdmessaging-sns-sqs.mdrds-databases.mdredshift-warehouse.mds3-storage.mdsagemaker-ml.md

messaging-sns-sqs.mddocs/

0

# Amazon SNS/SQS Messaging Services

1

2

Amazon Simple Notification Service (SNS) and Simple Queue Service (SQS) provide comprehensive messaging capabilities for event-driven architectures, enabling pub/sub messaging, message queuing, and asynchronous communication patterns in data pipelines.

3

4

## Capabilities

5

6

### SNS Publication and Notification

7

8

Publish messages to SNS topics for fan-out messaging patterns and multi-subscriber notifications.

9

10

```python { .api }

11

class SnsPublishOperator(AwsBaseOperator):

12

"""

13

Publish a message to Amazon SNS.

14

15

Parameters:

16

- target_arn: str - either a TopicArn or an EndpointArn

17

- message: str - the default message you want to send

18

- subject: str - the message subject you want to send

19

- message_attributes: dict - message attributes as a flat dict

20

- message_deduplication_id: str - unique message deduplication ID (FIFO topics only)

21

- message_group_id: str - message group ID (FIFO topics only)

22

- aws_conn_id: str - Airflow connection for AWS credentials

23

- region_name: str - AWS region name

24

- verify: bool - whether to verify SSL certificates

25

- botocore_config: dict - botocore client configuration

26

27

Returns:

28

str: Message ID from SNS

29

"""

30

def __init__(

31

self,

32

*,

33

target_arn: str,

34

message: str,

35

subject: str = None,

36

message_attributes: dict = None,

37

message_deduplication_id: str = None,

38

message_group_id: str = None,

39

**kwargs

40

): ...

41

```

42

43

### SNS Service Hook

44

45

Low-level SNS operations for topic management and message publishing.

46

47

```python { .api }

48

class SnsHook(AwsBaseHook):

49

"""

50

Hook for Amazon SNS operations.

51

52

Parameters:

53

- aws_conn_id: str - Airflow connection for AWS credentials

54

- region_name: str - AWS region name

55

- verify: bool - whether to verify SSL certificates

56

- botocore_config: dict - botocore client configuration

57

"""

58

def __init__(

59

self,

60

aws_conn_id: str = 'aws_default',

61

region_name: str = None,

62

verify: bool = None,

63

botocore_config: dict = None,

64

**kwargs

65

): ...

66

67

def publish_to_target(

68

self,

69

target_arn: str,

70

message: str,

71

subject: str = None,

72

message_attributes: dict = None,

73

message_deduplication_id: str = None,

74

message_group_id: str = None

75

) -> str:

76

"""

77

Publish a message to an SNS topic or endpoint.

78

79

Parameters:

80

- target_arn: str - TopicArn or EndpointArn

81

- message: str - message content

82

- subject: str - message subject

83

- message_attributes: dict - message attributes

84

- message_deduplication_id: str - deduplication ID for FIFO topics

85

- message_group_id: str - group ID for FIFO topics

86

87

Returns:

88

str: Message ID

89

"""

90

...

91

92

def create_topic(self, name: str, attributes: dict = None, tags: dict = None) -> str:

93

"""

94

Create an SNS topic.

95

96

Parameters:

97

- name: str - topic name

98

- attributes: dict - topic attributes

99

- tags: dict - topic tags

100

101

Returns:

102

str: Topic ARN

103

"""

104

...

105

106

def delete_topic(self, topic_arn: str) -> None:

107

"""Delete an SNS topic."""

108

...

109

110

def list_topics(self, next_token: str = None) -> dict:

111

"""List SNS topics."""

112

...

113

114

def get_topic_attributes(self, topic_arn: str) -> dict:

115

"""Get attributes for an SNS topic."""

116

...

117

```

118

119

### SQS Message Publishing

120

121

Send messages to SQS queues with support for standard and FIFO queues.

122

123

```python { .api }

124

class SqsPublishOperator(AwsBaseOperator):

125

"""

126

Publish a message to an Amazon SQS queue.

127

128

Parameters:

129

- sqs_queue: str - SQS queue URL

130

- message_content: str - message content

131

- message_attributes: dict - additional message attributes

132

- delay_seconds: int - message delay (default: 0)

133

- message_group_id: str - message group ID (FIFO queues only)

134

- message_deduplication_id: str - deduplication ID (FIFO queues only)

135

- aws_conn_id: str - Airflow connection for AWS credentials

136

- region_name: str - AWS region name

137

- verify: bool - whether to verify SSL certificates

138

- botocore_config: dict - botocore client configuration

139

140

Returns:

141

dict: Information about the message sent

142

"""

143

def __init__(

144

self,

145

*,

146

sqs_queue: str,

147

message_content: str,

148

message_attributes: dict = None,

149

delay_seconds: int = 0,

150

message_group_id: str = None,

151

message_deduplication_id: str = None,

152

**kwargs

153

): ...

154

```

155

156

### SQS Message Processing and Sensing

157

158

Monitor SQS queues and process messages with comprehensive polling capabilities.

159

160

```python { .api }

161

class SqsSensor(BaseSensorOperator):

162

"""

163

Poll an Amazon SQS queue and process available messages.

164

165

Parameters:

166

- sqs_queue: str - SQS queue URL

167

- max_messages: int - maximum number of messages to receive (1-10)

168

- num_batches: int - number of batches to process

169

- wait_time_seconds: int - long polling wait time (0-20 seconds)

170

- visibility_timeout_seconds: int - message visibility timeout

171

- message_filtering: str - filtering method for messages

172

- message_filtering_match_values: list - values to match for filtering

173

- message_filtering_config: dict - advanced filtering configuration

174

- delete_message_on_reception: bool - delete message after receiving

175

- aws_conn_id: str - Airflow connection for AWS credentials

176

- region_name: str - AWS region name

177

178

Returns:

179

list: Received messages

180

"""

181

def __init__(

182

self,

183

sqs_queue: str,

184

max_messages: int = 5,

185

num_batches: int = 1,

186

wait_time_seconds: int = 1,

187

visibility_timeout_seconds: int = None,

188

message_filtering: str = None,

189

message_filtering_match_values: list = None,

190

message_filtering_config: dict = None,

191

delete_message_on_reception: bool = True,

192

**kwargs

193

): ...

194

```

195

196

### SQS Service Hook

197

198

Comprehensive SQS operations for queue management and message handling.

199

200

```python { .api }

201

class SqsHook(AwsBaseHook):

202

"""

203

Hook for Amazon SQS operations.

204

205

Parameters:

206

- aws_conn_id: str - Airflow connection for AWS credentials

207

- region_name: str - AWS region name

208

- verify: bool - whether to verify SSL certificates

209

- botocore_config: dict - botocore client configuration

210

"""

211

def __init__(

212

self,

213

aws_conn_id: str = 'aws_default',

214

region_name: str = None,

215

verify: bool = None,

216

botocore_config: dict = None,

217

**kwargs

218

): ...

219

220

def send_message(

221

self,

222

queue_url: str,

223

message_body: str,

224

delay_seconds: int = 0,

225

message_attributes: dict = None,

226

message_group_id: str = None,

227

message_deduplication_id: str = None

228

) -> dict:

229

"""

230

Send a message to an SQS queue.

231

232

Parameters:

233

- queue_url: str - SQS queue URL

234

- message_body: str - message content

235

- delay_seconds: int - delivery delay

236

- message_attributes: dict - message attributes

237

- message_group_id: str - group ID for FIFO queues

238

- message_deduplication_id: str - deduplication ID for FIFO queues

239

240

Returns:

241

dict: Send message response

242

"""

243

...

244

245

def receive_message(

246

self,

247

queue_url: str,

248

max_number_of_messages: int = 1,

249

wait_time_seconds: int = 0,

250

visibility_timeout_seconds: int = None,

251

message_attribute_names: list = None,

252

receive_request_attempt_id: str = None

253

) -> list[dict]:

254

"""

255

Receive messages from an SQS queue.

256

257

Parameters:

258

- queue_url: str - SQS queue URL

259

- max_number_of_messages: int - maximum messages to receive (1-10)

260

- wait_time_seconds: int - long polling wait time (0-20)

261

- visibility_timeout_seconds: int - message visibility timeout

262

- message_attribute_names: list - attributes to retrieve

263

- receive_request_attempt_id: str - deduplication ID for FIFO queues

264

265

Returns:

266

list: Received messages

267

"""

268

...

269

270

def delete_message(self, queue_url: str, receipt_handle: str) -> dict:

271

"""Delete a message from an SQS queue."""

272

...

273

274

def create_queue(

275

self,

276

queue_name: str,

277

attributes: dict = None,

278

tags: dict = None

279

) -> str:

280

"""

281

Create an SQS queue.

282

283

Parameters:

284

- queue_name: str - queue name

285

- attributes: dict - queue attributes

286

- tags: dict - queue tags

287

288

Returns:

289

str: Queue URL

290

"""

291

...

292

293

def delete_queue(self, queue_url: str) -> dict:

294

"""Delete an SQS queue."""

295

...

296

297

def get_queue_attributes(self, queue_url: str, attribute_names: list = None) -> dict:

298

"""Get attributes for an SQS queue."""

299

...

300

301

def purge_queue(self, queue_url: str) -> dict:

302

"""Purge all messages from an SQS queue."""

303

...

304

```

305

306

### Event-Driven Queue Processing

307

308

Process SQS messages with custom handling and automatic message management.

309

310

```python { .api }

311

class SqsExecutor:

312

"""

313

Executor for processing SQS messages in Airflow workflows.

314

315

Parameters:

316

- queue_url: str - SQS queue URL

317

- aws_conn_id: str - Airflow connection for AWS credentials

318

- region_name: str - AWS region name

319

"""

320

def __init__(

321

self,

322

queue_url: str,

323

aws_conn_id: str = 'aws_default',

324

region_name: str = None

325

): ...

326

327

def submit_job(self, job_name: str, job_kwargs: dict) -> str:

328

"""Submit a job message to the SQS queue."""

329

...

330

331

def heartbeat(self) -> None:

332

"""Send heartbeat for long-running operations."""

333

...

334

```

335

336

## Usage Examples

337

338

### Event Publication with SNS

339

340

```python

341

from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator

342

343

# Publish workflow completion notification

344

notify_completion = SnsPublishOperator(

345

task_id='notify_data_pipeline_complete',

346

target_arn='arn:aws:sns:us-west-2:123456789012:data-pipeline-notifications',

347

subject='Data Pipeline Completed Successfully',

348

message="""

349

Data pipeline execution completed successfully.

350

351

Pipeline: {{ dag.dag_id }}

352

Execution Date: {{ ds }}

353

Duration: {{ (ti.end_date - ti.start_date).total_seconds() }} seconds

354

355

All data processing tasks completed without errors.

356

""",

357

message_attributes={

358

'pipeline_name': {

359

'DataType': 'String',

360

'StringValue': '{{ dag.dag_id }}'

361

},

362

'execution_date': {

363

'DataType': 'String',

364

'StringValue': '{{ ds }}'

365

},

366

'status': {

367

'DataType': 'String',

368

'StringValue': 'SUCCESS'

369

}

370

},

371

aws_conn_id='aws_default'

372

)

373

```

374

375

### Message Queue Processing

376

377

```python

378

from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator

379

from airflow.providers.amazon.aws.sensors.sqs import SqsSensor

380

381

# Send processing job to queue

382

queue_job = SqsPublishOperator(

383

task_id='queue_processing_job',

384

sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/data-processing-jobs',

385

message_content='{{ ti.xcom_pull(task_ids="prepare_job_config") | tojson }}',

386

message_attributes={

387

'job_type': {

388

'DataType': 'String',

389

'StringValue': 'batch_processing'

390

},

391

'priority': {

392

'DataType': 'Number',

393

'StringValue': '5'

394

},

395

'source_dag': {

396

'DataType': 'String',

397

'StringValue': '{{ dag.dag_id }}'

398

}

399

},

400

delay_seconds=30, # Delay processing by 30 seconds

401

aws_conn_id='aws_default'

402

)

403

404

# Monitor job completion messages

405

monitor_completion = SqsSensor(

406

task_id='monitor_job_completion',

407

sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/job-completion-notifications',

408

max_messages=1,

409

wait_time_seconds=20, # Long polling

410

message_filtering='jsonpath',

411

message_filtering_match_values=['SUCCESS'],

412

message_filtering_config={

413

'json_path': '$.status',

414

'match_values': ['SUCCESS', 'COMPLETED']

415

},

416

timeout=3600, # 1 hour timeout

417

poke_interval=60, # Check every minute

418

aws_conn_id='aws_default'

419

)

420

421

queue_job >> monitor_completion

422

```

423

424

### FIFO Queue for Ordered Processing

425

426

```python

427

# Send ordered messages to FIFO queue

428

send_ordered_messages = SqsPublishOperator(

429

task_id='send_file_processing_order',

430

sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/file-processing-order.fifo',

431

message_content='{{ ti.xcom_pull(task_ids="list_files") | tojson }}',

432

message_group_id='file-batch-{{ ds_nodash }}', # Group by date

433

message_deduplication_id='file-processing-{{ ds_nodash }}-{{ ti.try_number }}',

434

message_attributes={

435

'batch_date': {

436

'DataType': 'String',

437

'StringValue': '{{ ds }}'

438

},

439

'file_count': {

440

'DataType': 'Number',

441

'StringValue': '{{ ti.xcom_pull(task_ids="count_files") }}'

442

}

443

},

444

aws_conn_id='aws_default'

445

)

446

```

447

448

### Custom Message Processing

449

450

```python

451

from airflow.providers.amazon.aws.hooks.sqs import SqsHook

452

453

def process_sqs_messages(**context):

454

"""Custom function to process SQS messages."""

455

sqs_hook = SqsHook(aws_conn_id='aws_default')

456

457

queue_url = 'https://sqs.us-west-2.amazonaws.com/123456789012/processing-queue'

458

459

# Receive messages

460

messages = sqs_hook.receive_message(

461

queue_url=queue_url,

462

max_number_of_messages=10,

463

wait_time_seconds=20,

464

visibility_timeout_seconds=300

465

)

466

467

processed_count = 0

468

469

for message in messages:

470

try:

471

# Process message body

472

message_body = json.loads(message['Body'])

473

474

# Custom processing logic

475

process_data_file(message_body['file_path'])

476

477

# Delete message after successful processing

478

sqs_hook.delete_message(

479

queue_url=queue_url,

480

receipt_handle=message['ReceiptHandle']

481

)

482

483

processed_count += 1

484

485

except Exception as e:

486

print(f"Error processing message: {e}")

487

# Message will become visible again after visibility timeout

488

489

return f"Processed {processed_count} messages"

490

491

# Use with PythonOperator

492

process_messages = PythonOperator(

493

task_id='process_queue_messages',

494

python_callable=process_sqs_messages

495

)

496

```

497

498

### Multi-Channel Event Broadcasting

499

500

```python

501

# Broadcast event to multiple SNS topics

502

def broadcast_event(**context):

503

"""Broadcast event to multiple notification channels."""

504

from airflow.providers.amazon.aws.hooks.sns import SnsHook

505

506

sns_hook = SnsHook(aws_conn_id='aws_default')

507

508

event_data = context['ti'].xcom_pull(task_ids='generate_event_data')

509

510

# Different topics for different audiences

511

topics = {

512

'engineering': 'arn:aws:sns:us-west-2:123456789012:engineering-alerts',

513

'operations': 'arn:aws:sns:us-west-2:123456789012:ops-notifications',

514

'business': 'arn:aws:sns:us-west-2:123456789012:business-updates'

515

}

516

517

for audience, topic_arn in topics.items():

518

# Customize message for each audience

519

if audience == 'engineering':

520

message = f"Technical Alert: {event_data['technical_details']}"

521

subject = f"ALERT: {event_data['system']}"

522

elif audience == 'operations':

523

message = f"Operational Update: {event_data['summary']}"

524

subject = f"OPS: {event_data['service']}"

525

else: # business

526

message = f"Business Impact: {event_data['business_impact']}"

527

subject = f"Business Update: {event_data['process']}"

528

529

sns_hook.publish_to_target(

530

target_arn=topic_arn,

531

message=message,

532

subject=subject,

533

message_attributes={

534

'event_type': {

535

'DataType': 'String',

536

'StringValue': event_data['type']

537

},

538

'severity': {

539

'DataType': 'String',

540

'StringValue': event_data['severity']

541

}

542

}

543

)

544

545

return "Event broadcasted to all channels"

546

547

broadcast_task = PythonOperator(

548

task_id='broadcast_event',

549

python_callable=broadcast_event

550

)

551

```

552

553

### Dead Letter Queue Handling

554

555

```python

556

# Monitor dead letter queue for failed messages

557

monitor_dlq = SqsSensor(

558

task_id='monitor_dead_letter_queue',

559

sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/processing-dlq',

560

max_messages=5,

561

wait_time_seconds=10,

562

delete_message_on_reception=False, # Keep for investigation

563

timeout=300, # 5 minute check

564

poke_interval=60,

565

mode='reschedule', # Don't block worker

566

aws_conn_id='aws_default'

567

)

568

569

def handle_failed_messages(**context):

570

"""Handle messages in dead letter queue."""

571

messages = context['ti'].xcom_pull(task_ids='monitor_dead_letter_queue')

572

573

if messages:

574

# Alert on failed messages

575

alert_sns = SnsPublishOperator(

576

task_id='alert_failed_messages',

577

target_arn='arn:aws:sns:us-west-2:123456789012:critical-alerts',

578

subject='Dead Letter Queue Alert',

579

message=f'Found {len(messages)} failed messages requiring investigation',

580

message_attributes={

581

'alert_type': {

582

'DataType': 'String',

583

'StringValue': 'DLQ_ALERT'

584

},

585

'message_count': {

586

'DataType': 'Number',

587

'StringValue': str(len(messages))

588

}

589

}

590

)

591

592

return alert_sns.execute(context)

593

594

return "No failed messages found"

595

596

handle_dlq = PythonOperator(

597

task_id='handle_dead_letter_messages',

598

python_callable=handle_failed_messages

599

)

600

601

monitor_dlq >> handle_dlq

602

```

603

604

## Import Statements

605

606

```python

607

from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator

608

from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator

609

from airflow.providers.amazon.aws.sensors.sqs import SqsSensor

610

from airflow.providers.amazon.aws.hooks.sns import SnsHook

611

from airflow.providers.amazon.aws.hooks.sqs import SqsHook

612

from airflow.providers.amazon.aws.notifications.sns import SnsNotifier

613

from airflow.providers.amazon.aws.notifications.sqs import SqsNotifier

614

```