CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

messaging-sns-sqs.mddocs/

Amazon SNS/SQS Messaging Services

Amazon Simple Notification Service (SNS) and Simple Queue Service (SQS) provide comprehensive messaging capabilities for event-driven architectures, enabling pub/sub messaging, message queuing, and asynchronous communication patterns in data pipelines.

Capabilities

SNS Publication and Notification

Publish messages to SNS topics for fan-out messaging patterns and multi-subscriber notifications.

class SnsPublishOperator(AwsBaseOperator):
    """
    Publish a message to Amazon SNS.
    
    Parameters:
    - target_arn: str - either a TopicArn or an EndpointArn
    - message: str - the default message you want to send
    - subject: str - the message subject you want to send
    - message_attributes: dict - message attributes as a flat dict
    - message_deduplication_id: str - unique message deduplication ID (FIFO topics only)
    - message_group_id: str - message group ID (FIFO topics only)
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    - verify: bool - whether to verify SSL certificates
    - botocore_config: dict - botocore client configuration
    
    Returns:
    str: Message ID from SNS
    """
    def __init__(
        self,
        *,
        target_arn: str,
        message: str,
        subject: str = None,
        message_attributes: dict = None,
        message_deduplication_id: str = None,
        message_group_id: str = None,
        **kwargs
    ): ...

SNS Service Hook

Low-level SNS operations for topic management and message publishing.

class SnsHook(AwsBaseHook):
    """
    Hook for Amazon SNS operations.
    
    Parameters:
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    - verify: bool - whether to verify SSL certificates
    - botocore_config: dict - botocore client configuration
    """
    def __init__(
        self,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        verify: bool = None,
        botocore_config: dict = None,
        **kwargs
    ): ...
    
    def publish_to_target(
        self,
        target_arn: str,
        message: str,
        subject: str = None,
        message_attributes: dict = None,
        message_deduplication_id: str = None,
        message_group_id: str = None
    ) -> str:
        """
        Publish a message to an SNS topic or endpoint.
        
        Parameters:
        - target_arn: str - TopicArn or EndpointArn
        - message: str - message content
        - subject: str - message subject
        - message_attributes: dict - message attributes
        - message_deduplication_id: str - deduplication ID for FIFO topics
        - message_group_id: str - group ID for FIFO topics
        
        Returns:
        str: Message ID
        """
        ...
    
    def create_topic(self, name: str, attributes: dict = None, tags: dict = None) -> str:
        """
        Create an SNS topic.
        
        Parameters:
        - name: str - topic name
        - attributes: dict - topic attributes
        - tags: dict - topic tags
        
        Returns:
        str: Topic ARN
        """
        ...
    
    def delete_topic(self, topic_arn: str) -> None:
        """Delete an SNS topic."""
        ...
    
    def list_topics(self, next_token: str = None) -> dict:
        """List SNS topics."""
        ...
    
    def get_topic_attributes(self, topic_arn: str) -> dict:
        """Get attributes for an SNS topic."""
        ...

SQS Message Publishing

Send messages to SQS queues with support for standard and FIFO queues.

class SqsPublishOperator(AwsBaseOperator):
    """
    Publish a message to an Amazon SQS queue.
    
    Parameters:
    - sqs_queue: str - SQS queue URL
    - message_content: str - message content
    - message_attributes: dict - additional message attributes
    - delay_seconds: int - message delay (default: 0)
    - message_group_id: str - message group ID (FIFO queues only)
    - message_deduplication_id: str - deduplication ID (FIFO queues only)
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    - verify: bool - whether to verify SSL certificates
    - botocore_config: dict - botocore client configuration
    
    Returns:
    dict: Information about the message sent
    """
    def __init__(
        self,
        *,
        sqs_queue: str,
        message_content: str,
        message_attributes: dict = None,
        delay_seconds: int = 0,
        message_group_id: str = None,
        message_deduplication_id: str = None,
        **kwargs
    ): ...

SQS Message Processing and Sensing

Monitor SQS queues and process messages with comprehensive polling capabilities.

class SqsSensor(BaseSensorOperator):
    """
    Poll an Amazon SQS queue and process available messages.
    
    Parameters:
    - sqs_queue: str - SQS queue URL
    - max_messages: int - maximum number of messages to receive (1-10)
    - num_batches: int - number of batches to process
    - wait_time_seconds: int - long polling wait time (0-20 seconds)
    - visibility_timeout_seconds: int - message visibility timeout
    - message_filtering: str - filtering method for messages
    - message_filtering_match_values: list - values to match for filtering
    - message_filtering_config: dict - advanced filtering configuration
    - delete_message_on_reception: bool - delete message after receiving
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    
    Returns:
    list: Received messages
    """
    def __init__(
        self,
        sqs_queue: str,
        max_messages: int = 5,
        num_batches: int = 1,
        wait_time_seconds: int = 1,
        visibility_timeout_seconds: int = None,
        message_filtering: str = None,
        message_filtering_match_values: list = None,
        message_filtering_config: dict = None,
        delete_message_on_reception: bool = True,
        **kwargs
    ): ...

SQS Service Hook

Comprehensive SQS operations for queue management and message handling.

class SqsHook(AwsBaseHook):
    """
    Hook for Amazon SQS operations.
    
    Parameters:
    - aws_conn_id: str - Airflow connection for AWS credentials  
    - region_name: str - AWS region name
    - verify: bool - whether to verify SSL certificates
    - botocore_config: dict - botocore client configuration
    """
    def __init__(
        self,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        verify: bool = None,
        botocore_config: dict = None,
        **kwargs
    ): ...
    
    def send_message(
        self,
        queue_url: str,
        message_body: str,
        delay_seconds: int = 0,
        message_attributes: dict = None,
        message_group_id: str = None,
        message_deduplication_id: str = None
    ) -> dict:
        """
        Send a message to an SQS queue.
        
        Parameters:
        - queue_url: str - SQS queue URL
        - message_body: str - message content
        - delay_seconds: int - delivery delay
        - message_attributes: dict - message attributes
        - message_group_id: str - group ID for FIFO queues
        - message_deduplication_id: str - deduplication ID for FIFO queues
        
        Returns:
        dict: Send message response
        """
        ...
    
    def receive_message(
        self,
        queue_url: str,
        max_number_of_messages: int = 1,
        wait_time_seconds: int = 0,
        visibility_timeout_seconds: int = None,
        message_attribute_names: list = None,
        receive_request_attempt_id: str = None
    ) -> list[dict]:
        """
        Receive messages from an SQS queue.
        
        Parameters:
        - queue_url: str - SQS queue URL
        - max_number_of_messages: int - maximum messages to receive (1-10)
        - wait_time_seconds: int - long polling wait time (0-20)
        - visibility_timeout_seconds: int - message visibility timeout
        - message_attribute_names: list - attributes to retrieve
        - receive_request_attempt_id: str - deduplication ID for FIFO queues
        
        Returns:
        list: Received messages
        """
        ...
    
    def delete_message(self, queue_url: str, receipt_handle: str) -> dict:
        """Delete a message from an SQS queue."""
        ...
    
    def create_queue(
        self,
        queue_name: str,
        attributes: dict = None,
        tags: dict = None
    ) -> str:
        """
        Create an SQS queue.
        
        Parameters:
        - queue_name: str - queue name
        - attributes: dict - queue attributes
        - tags: dict - queue tags
        
        Returns:
        str: Queue URL
        """
        ...
    
    def delete_queue(self, queue_url: str) -> dict:
        """Delete an SQS queue."""
        ...
    
    def get_queue_attributes(self, queue_url: str, attribute_names: list = None) -> dict:
        """Get attributes for an SQS queue."""
        ...
    
    def purge_queue(self, queue_url: str) -> dict:
        """Purge all messages from an SQS queue."""
        ...

Event-Driven Queue Processing

Process SQS messages with custom handling and automatic message management.

class SqsExecutor:
    """
    Executor for processing SQS messages in Airflow workflows.
    
    Parameters:
    - queue_url: str - SQS queue URL
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    """
    def __init__(
        self,
        queue_url: str,
        aws_conn_id: str = 'aws_default',
        region_name: str = None
    ): ...
    
    def submit_job(self, job_name: str, job_kwargs: dict) -> str:
        """Submit a job message to the SQS queue."""
        ...
    
    def heartbeat(self) -> None:
        """Send heartbeat for long-running operations."""
        ...

Usage Examples

Event Publication with SNS

from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator

# Publish workflow completion notification
notify_completion = SnsPublishOperator(
    task_id='notify_data_pipeline_complete',
    target_arn='arn:aws:sns:us-west-2:123456789012:data-pipeline-notifications',
    subject='Data Pipeline Completed Successfully',
    message="""
    Data pipeline execution completed successfully.
    
    Pipeline: {{ dag.dag_id }}
    Execution Date: {{ ds }}
    Duration: {{ (ti.end_date - ti.start_date).total_seconds() }} seconds
    
    All data processing tasks completed without errors.
    """,
    message_attributes={
        'pipeline_name': {
            'DataType': 'String',
            'StringValue': '{{ dag.dag_id }}'
        },
        'execution_date': {
            'DataType': 'String', 
            'StringValue': '{{ ds }}'
        },
        'status': {
            'DataType': 'String',
            'StringValue': 'SUCCESS'
        }
    },
    aws_conn_id='aws_default'
)

Message Queue Processing

from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor

# Send processing job to queue
queue_job = SqsPublishOperator(
    task_id='queue_processing_job',
    sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/data-processing-jobs',
    message_content='{{ ti.xcom_pull(task_ids="prepare_job_config") | tojson }}',
    message_attributes={
        'job_type': {
            'DataType': 'String',
            'StringValue': 'batch_processing'
        },
        'priority': {
            'DataType': 'Number',
            'StringValue': '5'
        },
        'source_dag': {
            'DataType': 'String',
            'StringValue': '{{ dag.dag_id }}'
        }
    },
    delay_seconds=30,  # Delay processing by 30 seconds
    aws_conn_id='aws_default'
)

# Monitor job completion messages
monitor_completion = SqsSensor(
    task_id='monitor_job_completion',
    sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/job-completion-notifications',
    max_messages=1,
    wait_time_seconds=20,  # Long polling
    message_filtering='jsonpath',
    message_filtering_match_values=['SUCCESS'],
    message_filtering_config={
        'json_path': '$.status',
        'match_values': ['SUCCESS', 'COMPLETED']
    },
    timeout=3600,  # 1 hour timeout
    poke_interval=60,  # Check every minute
    aws_conn_id='aws_default'
)

queue_job >> monitor_completion

FIFO Queue for Ordered Processing

# Send ordered messages to FIFO queue
send_ordered_messages = SqsPublishOperator(
    task_id='send_file_processing_order',
    sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/file-processing-order.fifo',
    message_content='{{ ti.xcom_pull(task_ids="list_files") | tojson }}',
    message_group_id='file-batch-{{ ds_nodash }}',  # Group by date
    message_deduplication_id='file-processing-{{ ds_nodash }}-{{ ti.try_number }}',
    message_attributes={
        'batch_date': {
            'DataType': 'String',
            'StringValue': '{{ ds }}'
        },
        'file_count': {
            'DataType': 'Number',
            'StringValue': '{{ ti.xcom_pull(task_ids="count_files") }}'
        }
    },
    aws_conn_id='aws_default'
)

Custom Message Processing

from airflow.providers.amazon.aws.hooks.sqs import SqsHook

def process_sqs_messages(**context):
    """Custom function to process SQS messages."""
    sqs_hook = SqsHook(aws_conn_id='aws_default')
    
    queue_url = 'https://sqs.us-west-2.amazonaws.com/123456789012/processing-queue'
    
    # Receive messages
    messages = sqs_hook.receive_message(
        queue_url=queue_url,
        max_number_of_messages=10,
        wait_time_seconds=20,
        visibility_timeout_seconds=300
    )
    
    processed_count = 0
    
    for message in messages:
        try:
            # Process message body
            message_body = json.loads(message['Body'])
            
            # Custom processing logic
            process_data_file(message_body['file_path'])
            
            # Delete message after successful processing
            sqs_hook.delete_message(
                queue_url=queue_url,
                receipt_handle=message['ReceiptHandle']
            )
            
            processed_count += 1
            
        except Exception as e:
            print(f"Error processing message: {e}")
            # Message will become visible again after visibility timeout
    
    return f"Processed {processed_count} messages"

# Use with PythonOperator
process_messages = PythonOperator(
    task_id='process_queue_messages',
    python_callable=process_sqs_messages
)

Multi-Channel Event Broadcasting

# Broadcast event to multiple SNS topics
def broadcast_event(**context):
    """Broadcast event to multiple notification channels."""
    from airflow.providers.amazon.aws.hooks.sns import SnsHook
    
    sns_hook = SnsHook(aws_conn_id='aws_default')
    
    event_data = context['ti'].xcom_pull(task_ids='generate_event_data')
    
    # Different topics for different audiences
    topics = {
        'engineering': 'arn:aws:sns:us-west-2:123456789012:engineering-alerts',
        'operations': 'arn:aws:sns:us-west-2:123456789012:ops-notifications',
        'business': 'arn:aws:sns:us-west-2:123456789012:business-updates'
    }
    
    for audience, topic_arn in topics.items():
        # Customize message for each audience
        if audience == 'engineering':
            message = f"Technical Alert: {event_data['technical_details']}"
            subject = f"ALERT: {event_data['system']}"
        elif audience == 'operations':
            message = f"Operational Update: {event_data['summary']}"
            subject = f"OPS: {event_data['service']}"
        else:  # business
            message = f"Business Impact: {event_data['business_impact']}"
            subject = f"Business Update: {event_data['process']}"
        
        sns_hook.publish_to_target(
            target_arn=topic_arn,
            message=message,
            subject=subject,
            message_attributes={
                'event_type': {
                    'DataType': 'String',
                    'StringValue': event_data['type']
                },
                'severity': {
                    'DataType': 'String',
                    'StringValue': event_data['severity']
                }
            }
        )
    
    return "Event broadcasted to all channels"

broadcast_task = PythonOperator(
    task_id='broadcast_event',
    python_callable=broadcast_event
)

Dead Letter Queue Handling

# Monitor dead letter queue for failed messages
monitor_dlq = SqsSensor(
    task_id='monitor_dead_letter_queue',
    sqs_queue='https://sqs.us-west-2.amazonaws.com/123456789012/processing-dlq',
    max_messages=5,
    wait_time_seconds=10,
    delete_message_on_reception=False,  # Keep for investigation
    timeout=300,  # 5 minute check
    poke_interval=60,
    mode='reschedule',  # Don't block worker
    aws_conn_id='aws_default'
)

def handle_failed_messages(**context):
    """Handle messages in dead letter queue."""
    messages = context['ti'].xcom_pull(task_ids='monitor_dead_letter_queue')
    
    if messages:
        # Alert on failed messages
        alert_sns = SnsPublishOperator(
            task_id='alert_failed_messages',
            target_arn='arn:aws:sns:us-west-2:123456789012:critical-alerts',
            subject='Dead Letter Queue Alert',
            message=f'Found {len(messages)} failed messages requiring investigation',
            message_attributes={
                'alert_type': {
                    'DataType': 'String',
                    'StringValue': 'DLQ_ALERT'
                },
                'message_count': {
                    'DataType': 'Number', 
                    'StringValue': str(len(messages))
                }
            }
        )
        
        return alert_sns.execute(context)
    
    return "No failed messages found"

handle_dlq = PythonOperator(
    task_id='handle_dead_letter_messages',
    python_callable=handle_failed_messages
)

monitor_dlq >> handle_dlq

Import Statements

from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
from airflow.providers.amazon.aws.hooks.sns import SnsHook
from airflow.providers.amazon.aws.hooks.sqs import SqsHook
from airflow.providers.amazon.aws.notifications.sns import SnsNotifier
from airflow.providers.amazon.aws.notifications.sqs import SqsNotifier

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json