Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon Simple Notification Service (SNS) and Simple Queue Service (SQS) provide comprehensive messaging capabilities for event-driven architectures, enabling pub/sub messaging, message queuing, and asynchronous communication patterns in data pipelines.
Publish messages to SNS topics for fan-out messaging patterns and multi-subscriber notifications.
class SnsPublishOperator(AwsBaseOperator):
"""
Publish a message to Amazon SNS.
Parameters:
- target_arn: str - either a TopicArn or an EndpointArn
- message: str - the default message you want to send
- subject: str - the message subject you want to send
- message_attributes: dict - message attributes as a flat dict
- message_deduplication_id: str - unique message deduplication ID (FIFO topics only)
- message_group_id: str - message group ID (FIFO topics only)
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
- verify: bool - whether to verify SSL certificates
- botocore_config: dict - botocore client configuration
Returns:
str: Message ID from SNS
"""
def __init__(
self,
*,
target_arn: str,
message: str,
subject: str = None,
message_attributes: dict = None,
message_deduplication_id: str = None,
message_group_id: str = None,
**kwargs
): ...Low-level SNS operations for topic management and message publishing.
class SnsHook(AwsBaseHook):
"""
Hook for Amazon SNS operations.
Parameters:
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
- verify: bool - whether to verify SSL certificates
- botocore_config: dict - botocore client configuration
"""
def __init__(
self,
aws_conn_id: str = 'aws_default',
region_name: str = None,
verify: bool = None,
botocore_config: dict = None,
**kwargs
): ...
def publish_to_target(
self,
target_arn: str,
message: str,
subject: str = None,
message_attributes: dict = None,
message_deduplication_id: str = None,
message_group_id: str = None
) -> str:
"""
Publish a message to an SNS topic or endpoint.
Parameters:
- target_arn: str - TopicArn or EndpointArn
- message: str - message content
- subject: str - message subject
- message_attributes: dict - message attributes
- message_deduplication_id: str - deduplication ID for FIFO topics
- message_group_id: str - group ID for FIFO topics
Returns:
str: Message ID
"""
...
def create_topic(self, name: str, attributes: dict = None, tags: dict = None) -> str:
"""
Create an SNS topic.
Parameters:
- name: str - topic name
- attributes: dict - topic attributes
- tags: dict - topic tags
Returns:
str: Topic ARN
"""
...
def delete_topic(self, topic_arn: str) -> None:
"""Delete an SNS topic."""
...
def list_topics(self, next_token: str = None) -> dict:
"""List SNS topics."""
...
def get_topic_attributes(self, topic_arn: str) -> dict:
"""Get attributes for an SNS topic."""
...Send messages to SQS queues with support for standard and FIFO queues.
class SqsPublishOperator(AwsBaseOperator):
"""
Publish a message to an Amazon SQS queue.
Parameters:
- sqs_queue: str - SQS queue URL
- message_content: str - message content
- message_attributes: dict - additional message attributes
- delay_seconds: int - message delay (default: 0)
- message_group_id: str - message group ID (FIFO queues only)
- message_deduplication_id: str - deduplication ID (FIFO queues only)
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
- verify: bool - whether to verify SSL certificates
- botocore_config: dict - botocore client configuration
Returns:
dict: Information about the message sent
"""
def __init__(
self,
*,
sqs_queue: str,
message_content: str,
message_attributes: dict = None,
delay_seconds: int = 0,
message_group_id: str = None,
message_deduplication_id: str = None,
**kwargs
): ...Monitor SQS queues and process messages with comprehensive polling capabilities.
class SqsSensor(BaseSensorOperator):
"""
Poll an Amazon SQS queue and process available messages.
Parameters:
- sqs_queue: str - SQS queue URL
- max_messages: int - maximum number of messages to receive (1-10)
- num_batches: int - number of batches to process
- wait_time_seconds: int - long polling wait time (0-20 seconds)
- visibility_timeout_seconds: int - message visibility timeout
- message_filtering: str - filtering method for messages
- message_filtering_match_values: list - values to match for filtering
- message_filtering_config: dict - advanced filtering configuration
- delete_message_on_reception: bool - delete message after receiving
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
Returns:
list: Received messages
"""
def __init__(
self,
sqs_queue: str,
max_messages: int = 5,
num_batches: int = 1,
wait_time_seconds: int = 1,
visibility_timeout_seconds: int = None,
message_filtering: str = None,
message_filtering_match_values: list = None,
message_filtering_config: dict = None,
delete_message_on_reception: bool = True,
**kwargs
): ...Comprehensive SQS operations for queue management and message handling.
class SqsHook(AwsBaseHook):
"""
Hook for Amazon SQS operations.
Parameters:
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
- verify: bool - whether to verify SSL certificates
- botocore_config: dict - botocore client configuration
"""
def __init__(
self,
aws_conn_id: str = 'aws_default',
region_name: str = None,
verify: bool = None,
botocore_config: dict = None,
**kwargs
): ...
def send_message(
self,
queue_url: str,
message_body: str,
delay_seconds: int = 0,
message_attributes: dict = None,
message_group_id: str = None,
message_deduplication_id: str = None
) -> dict:
"""
Send a message to an SQS queue.
Parameters:
- queue_url: str - SQS queue URL
- message_body: str - message content
- delay_seconds: int - delivery delay
- message_attributes: dict - message attributes
- message_group_id: str - group ID for FIFO queues
- message_deduplication_id: str - deduplication ID for FIFO queues
Returns:
dict: Send message response
"""
...
def receive_message(
self,
queue_url: str,
max_number_of_messages: int = 1,
wait_time_seconds: int = 0,
visibility_timeout_seconds: int = None,
message_attribute_names: list = None,
receive_request_attempt_id: str = None
) -> list[dict]:
"""
Receive messages from an SQS queue.
Parameters:
- queue_url: str - SQS queue URL
- max_number_of_messages: int - maximum messages to receive (1-10)
- wait_time_seconds: int - long polling wait time (0-20)
- visibility_timeout_seconds: int - message visibility timeout
- message_attribute_names: list - attributes to retrieve
- receive_request_attempt_id: str - deduplication ID for FIFO queues
Returns:
list: Received messages
"""
...
def delete_message(self, queue_url: str, receipt_handle: str) -> dict:
"""Delete a message from an SQS queue."""
...
def create_queue(
self,
queue_name: str,
attributes: dict = None,
tags: dict = None
) -> str:
"""
Create an SQS queue.
Parameters:
- queue_name: str - queue name
- attributes: dict - queue attributes
- tags: dict - queue tags
Returns:
str: Queue URL
"""
...
def delete_queue(self, queue_url: str) -> dict:
"""Delete an SQS queue."""
...
def get_queue_attributes(self, queue_url: str, attribute_names: list = None) -> dict:
"""Get attributes for an SQS queue."""
...
def purge_queue(self, queue_url: str) -> dict:
"""Purge all messages from an SQS queue."""
...Process SQS messages with custom handling and automatic message management.
class SqsExecutor:
"""
Executor for processing SQS messages in Airflow workflows.
Parameters:
- queue_url: str - SQS queue URL
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
"""
def __init__(
self,
queue_url: str,
aws_conn_id: str = 'aws_default',
region_name: str = None
): ...
def submit_job(self, job_name: str, job_kwargs: dict) -> str:
"""Submit a job message to the SQS queue."""
...
def heartbeat(self) -> None:
"""Send heartbeat for long-running operations."""
...from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
# Publish workflow completion notification
notify_completion = SnsPublishOperator(
task_id='notify_data_pipeline_complete',
target_arn='arn:aws:sns:us-west-2:123456789012:data-pipeline-notifications',
subject='Data Pipeline Completed Successfully',
message="""
Data pipeline execution completed successfully.
Pipeline: {{ dag.dag_id }}
Execution Date: {{ ds }}
Duration: {{ (ti.end_date - ti.start_date).total_seconds() }} seconds
All data processing tasks completed without errors.
""",
message_attributes={
'pipeline_name': {
'DataType': 'String',
'StringValue': '{{ dag.dag_id }}'
},
'execution_date': {
'DataType': 'String',
'StringValue': '{{ ds }}'
},
'status': {
'DataType': 'String',
'StringValue': 'SUCCESS'
}
},
aws_conn_id='aws_default'
)from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
# Send processing job to queue
queue_job = SqsPublishOperator(
task_id='queue_processing_job',
sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/data-processing-jobs',
message_content='{{ ti.xcom_pull(task_ids="prepare_job_config") | tojson }}',
message_attributes={
'job_type': {
'DataType': 'String',
'StringValue': 'batch_processing'
},
'priority': {
'DataType': 'Number',
'StringValue': '5'
},
'source_dag': {
'DataType': 'String',
'StringValue': '{{ dag.dag_id }}'
}
},
delay_seconds=30, # Delay processing by 30 seconds
aws_conn_id='aws_default'
)
# Monitor job completion messages
monitor_completion = SqsSensor(
task_id='monitor_job_completion',
sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/job-completion-notifications',
max_messages=1,
wait_time_seconds=20, # Long polling
message_filtering='jsonpath',
message_filtering_match_values=['SUCCESS'],
message_filtering_config={
'json_path': '$.status',
'match_values': ['SUCCESS', 'COMPLETED']
},
timeout=3600, # 1 hour timeout
poke_interval=60, # Check every minute
aws_conn_id='aws_default'
)
queue_job >> monitor_completion# Send ordered messages to FIFO queue
send_ordered_messages = SqsPublishOperator(
task_id='send_file_processing_order',
sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/file-processing-order.fifo',
message_content='{{ ti.xcom_pull(task_ids="list_files") | tojson }}',
message_group_id='file-batch-{{ ds_nodash }}', # Group by date
message_deduplication_id='file-processing-{{ ds_nodash }}-{{ ti.try_number }}',
message_attributes={
'batch_date': {
'DataType': 'String',
'StringValue': '{{ ds }}'
},
'file_count': {
'DataType': 'Number',
'StringValue': '{{ ti.xcom_pull(task_ids="count_files") }}'
}
},
aws_conn_id='aws_default'
)from airflow.providers.amazon.aws.hooks.sqs import SqsHook
def process_sqs_messages(**context):
"""Custom function to process SQS messages."""
sqs_hook = SqsHook(aws_conn_id='aws_default')
queue_url = 'https://sqs.us-west-2.amazonaws.com/123456789012/processing-queue'
# Receive messages
messages = sqs_hook.receive_message(
queue_url=queue_url,
max_number_of_messages=10,
wait_time_seconds=20,
visibility_timeout_seconds=300
)
processed_count = 0
for message in messages:
try:
# Process message body
message_body = json.loads(message['Body'])
# Custom processing logic
process_data_file(message_body['file_path'])
# Delete message after successful processing
sqs_hook.delete_message(
queue_url=queue_url,
receipt_handle=message['ReceiptHandle']
)
processed_count += 1
except Exception as e:
print(f"Error processing message: {e}")
# Message will become visible again after visibility timeout
return f"Processed {processed_count} messages"
# Use with PythonOperator
process_messages = PythonOperator(
task_id='process_queue_messages',
python_callable=process_sqs_messages
)# Broadcast event to multiple SNS topics
def broadcast_event(**context):
"""Broadcast event to multiple notification channels."""
from airflow.providers.amazon.aws.hooks.sns import SnsHook
sns_hook = SnsHook(aws_conn_id='aws_default')
event_data = context['ti'].xcom_pull(task_ids='generate_event_data')
# Different topics for different audiences
topics = {
'engineering': 'arn:aws:sns:us-west-2:123456789012:engineering-alerts',
'operations': 'arn:aws:sns:us-west-2:123456789012:ops-notifications',
'business': 'arn:aws:sns:us-west-2:123456789012:business-updates'
}
for audience, topic_arn in topics.items():
# Customize message for each audience
if audience == 'engineering':
message = f"Technical Alert: {event_data['technical_details']}"
subject = f"ALERT: {event_data['system']}"
elif audience == 'operations':
message = f"Operational Update: {event_data['summary']}"
subject = f"OPS: {event_data['service']}"
else: # business
message = f"Business Impact: {event_data['business_impact']}"
subject = f"Business Update: {event_data['process']}"
sns_hook.publish_to_target(
target_arn=topic_arn,
message=message,
subject=subject,
message_attributes={
'event_type': {
'DataType': 'String',
'StringValue': event_data['type']
},
'severity': {
'DataType': 'String',
'StringValue': event_data['severity']
}
}
)
return "Event broadcasted to all channels"
broadcast_task = PythonOperator(
task_id='broadcast_event',
python_callable=broadcast_event
)# Monitor dead letter queue for failed messages
monitor_dlq = SqsSensor(
task_id='monitor_dead_letter_queue',
sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/processing-dlq',
max_messages=5,
wait_time_seconds=10,
delete_message_on_reception=False, # Keep for investigation
timeout=300, # 5 minute check
poke_interval=60,
mode='reschedule', # Don't block worker
aws_conn_id='aws_default'
)
def handle_failed_messages(**context):
"""Handle messages in dead letter queue."""
messages = context['ti'].xcom_pull(task_ids='monitor_dead_letter_queue')
if messages:
# Alert on failed messages
alert_sns = SnsPublishOperator(
task_id='alert_failed_messages',
target_arn='arn:aws:sns:us-west-2:123456789012:critical-alerts',
subject='Dead Letter Queue Alert',
message=f'Found {len(messages)} failed messages requiring investigation',
message_attributes={
'alert_type': {
'DataType': 'String',
'StringValue': 'DLQ_ALERT'
},
'message_count': {
'DataType': 'Number',
'StringValue': str(len(messages))
}
}
)
return alert_sns.execute(context)
return "No failed messages found"
handle_dlq = PythonOperator(
task_id='handle_dead_letter_messages',
python_callable=handle_failed_messages
)
monitor_dlq >> handle_dlqfrom airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
from airflow.providers.amazon.aws.hooks.sns import SnsHook
from airflow.providers.amazon.aws.hooks.sqs import SqsHook
from airflow.providers.amazon.aws.notifications.sns import SnsNotifier
from airflow.providers.amazon.aws.notifications.sqs import SqsNotifierInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon