CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

dynamodb-nosql.mddocs/

Amazon DynamoDB NoSQL Database

Amazon DynamoDB provides managed NoSQL database operations with support for batch data operations, import/export functionality, and seamless integration with S3 and other AWS services for data pipeline workflows.

Capabilities

Batch Data Operations

Efficiently write large volumes of data to DynamoDB tables with automatic retry and error handling.

class DynamoDBHook(AwsBaseHook):
    """
    Hook for Amazon DynamoDB operations.
    
    Parameters:
    - table_keys: list - partition key and sort key for the table
    - table_name: str - target DynamoDB table name
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    """
    def __init__(
        self,
        table_keys: list = None,
        table_name: str = None,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        **kwargs
    ): ...
    
    def write_batch_data(self, items: Iterable) -> bool:
        """
        Write batch items to DynamoDB table with provisioned throughput capacity.
        
        Parameters:
        - items: Iterable - list of DynamoDB items to write
        
        Returns:
        bool: True if batch write succeeds
        """
        ...
    
    def get_import_status(self, import_arn: str) -> tuple[str, str, str]:
        """
        Get import status from DynamoDB.
        
        Parameters:
        - import_arn: str - Amazon Resource Name (ARN) for the import
        
        Returns:
        tuple: Import status, error code, error message
        """
        ...

Value Monitoring and Sensing

Monitor DynamoDB table attributes and wait for specific values or conditions.

class DynamoDBValueSensor(AwsBaseSensor):
    """
    Wait for an attribute value to be present for an item in a DynamoDB table.
    
    Parameters:
    - table_name: str - DynamoDB table name
    - partition_key_name: str - DynamoDB partition key name
    - partition_key_value: str - DynamoDB partition key value
    - attribute_name: str - DynamoDB attribute name to monitor
    - attribute_value: Any - expected DynamoDB attribute value
    - sort_key_name: str - DynamoDB sort key name (optional)
    - sort_key_value: str - DynamoDB sort key value (optional)
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    - poke_interval: int - time in seconds between checks
    - timeout: int - maximum time to wait
    - mode: str - sensor mode ('poke' or 'reschedule')
    
    Returns:
    bool: True when attribute value matches expected value
    """
    def __init__(
        self,
        table_name: str,
        partition_key_name: str,
        partition_key_value: str,
        attribute_name: str,
        attribute_value: Any,
        sort_key_name: str = None,
        sort_key_value: str = None,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        **kwargs
    ): ...

S3 Integration and Data Transfers

Transfer data between DynamoDB and S3 for backup, analytics, and data pipeline operations.

class S3ToDynamoDBOperator(BaseOperator):
    """
    Load data from S3 to DynamoDB table.
    
    Parameters:
    - s3_bucket: str - S3 bucket name containing source data
    - s3_key: str - S3 key path to source data
    - dynamodb_table: str - target DynamoDB table name
    - table_keys: list - partition key and sort key configuration
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    - input_format: str - input data format ('JSON', 'CSV', 'DYNAMODB_JSON')
    - input_compression: str - compression type ('GZIP', 'ZSTD', 'NONE')
    
    Returns:
    str: Import job ARN
    """
    def __init__(
        self,
        s3_bucket: str,
        s3_key: str,
        dynamodb_table: str,
        table_keys: list = None,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        input_format: str = 'DYNAMODB_JSON',
        input_compression: str = 'NONE',
        **kwargs
    ): ...
class DynamoDBToS3Operator(BaseOperator):
    """
    Export data from DynamoDB table to S3.
    
    Parameters:
    - dynamodb_table_name: str - source DynamoDB table name
    - s3_bucket_name: str - target S3 bucket name
    - s3_key_prefix: str - S3 key prefix for exported data
    - dynamodb_scan_kwargs: dict - additional scan parameters
    - s3_key_prefix_datetime_format: str - datetime format for S3 key prefix
    - process_func: Callable - function to process items before export
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    
    Returns:
    str: S3 export location
    """
    def __init__(
        self,
        dynamodb_table_name: str,
        s3_bucket_name: str,
        s3_key_prefix: str = '',
        dynamodb_scan_kwargs: dict = None,
        s3_key_prefix_datetime_format: str = '%Y/%m/%d',
        process_func: Callable = None,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        **kwargs
    ): ...

Hive Integration

Transfer data between Apache Hive and DynamoDB for data warehouse integration.

class HiveToDynamoDBOperator(BaseOperator):
    """
    Move data from Hive to DynamoDB.
    
    Parameters:
    - sql: str - SQL query to extract data from Hive
    - table_name: str - target DynamoDB table name
    - table_keys: list - partition key and sort key configuration
    - pre_process: Callable - function to preprocess data before insertion
    - pre_process_args: tuple - arguments for preprocessing function
    - pre_process_kwargs: dict - keyword arguments for preprocessing function
    - region_name: str - AWS region name
    - schema: str - Hive database schema
    - hiveserver2_conn_id: str - Airflow connection for Hive
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    bool: True if transfer succeeds
    """
    def __init__(
        self,
        sql: str,
        table_name: str,
        table_keys: list = None,
        pre_process: Callable = None,
        pre_process_args: tuple = None,
        pre_process_kwargs: dict = None,
        region_name: str = None,
        schema: str = 'default',
        hiveserver2_conn_id: str = 'hiveserver2_default',
        aws_conn_id: str = 'aws_default',
        **kwargs
    ): ...

Usage Examples

Batch Data Loading

from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook

def load_user_data(**context):
    """Load user data into DynamoDB."""
    dynamodb_hook = DynamoDBHook(
        table_name='users',
        table_keys=['user_id'],  # Partition key
        aws_conn_id='aws_default'
    )
    
    # Sample user data
    users = [
        {
            'user_id': '001',
            'name': 'Alice Johnson',
            'email': 'alice@example.com',
            'tier': 'premium',
            'signup_date': '2023-01-15'
        },
        {
            'user_id': '002', 
            'name': 'Bob Smith',
            'email': 'bob@example.com',
            'tier': 'standard',
            'signup_date': '2023-02-20'
        }
    ]
    
    # Write batch data
    success = dynamodb_hook.write_batch_data(users)
    return f"Batch write success: {success}"

# Use with PythonOperator
load_users = PythonOperator(
    task_id='load_user_data',
    python_callable=load_user_data
)

Data Pipeline with Value Monitoring

from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
from airflow.providers.amazon.aws.transfers.s3_to_dynamodb import S3ToDynamoDBOperator

# Wait for processing status to be complete
wait_for_completion = DynamoDBValueSensor(
    task_id='wait_for_processing_complete',
    table_name='job_status',
    partition_key_name='job_id',
    partition_key_value='batch_job_001',
    attribute_name='status',
    attribute_value='COMPLETED',
    poke_interval=30,
    timeout=3600,  # 1 hour timeout
    aws_conn_id='aws_default'
)

# Load processed data from S3 to DynamoDB
load_results = S3ToDynamoDBOperator(
    task_id='load_processed_results',
    s3_bucket='processed-data-bucket',
    s3_key='batch-results/job_001/results.json',
    dynamodb_table='processed_results',
    table_keys=['result_id', 'timestamp'],  # Partition and sort keys
    input_format='DYNAMODB_JSON',
    aws_conn_id='aws_default'
)

wait_for_completion >> load_results

S3 to DynamoDB Data Pipeline

from airflow.providers.amazon.aws.transfers.s3_to_dynamodb import S3ToDynamoDBOperator

# Import customer data from S3 to DynamoDB
import_customers = S3ToDynamoDBOperator(
    task_id='import_customer_data',
    s3_bucket='data-lake-bucket',
    s3_key='customer-data/{{ ds }}/customers.json.gz',
    dynamodb_table='customers',
    table_keys=['customer_id'],
    input_format='DYNAMODB_JSON',
    input_compression='GZIP',
    aws_conn_id='aws_default'
)

DynamoDB to S3 Export for Analytics

from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator

def transform_user_data(item):
    """Transform DynamoDB item for analytics."""
    return {
        'user_id': item['user_id'],
        'name': item['name'],
        'tier': item['tier'],
        'signup_month': item['signup_date'][:7],  # YYYY-MM
        'is_premium': item['tier'] == 'premium'
    }

# Export user data to S3 for analytics
export_for_analytics = DynamoDBToS3Operator(
    task_id='export_users_for_analytics',
    dynamodb_table_name='users',
    s3_bucket_name='analytics-data-bucket',
    s3_key_prefix='user-data/{{ ds }}/',
    dynamodb_scan_kwargs={
        'ProjectionExpression': 'user_id, #n, email, tier, signup_date',
        'ExpressionAttributeNames': {'#n': 'name'},
        'FilterExpression': 'attribute_exists(tier)'
    },
    process_func=transform_user_data,
    s3_key_prefix_datetime_format='%Y/%m/%d',
    aws_conn_id='aws_default'
)

Complex Multi-Table Operation

from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook

def sync_customer_orders(**context):
    """Synchronize customer data across multiple DynamoDB tables."""
    # Initialize hooks for different tables
    customers_hook = DynamoDBHook(
        table_name='customers',
        table_keys=['customer_id'],
        aws_conn_id='aws_default'
    )
    
    orders_hook = DynamoDBHook(
        table_name='orders',
        table_keys=['order_id'],
        aws_conn_id='aws_default'
    )
    
    # Process and sync data between tables
    # Implementation would fetch from one table and update another
    return "Synchronization completed"

# Multi-table synchronization task
sync_tables = PythonOperator(
    task_id='sync_customer_orders',
    python_callable=sync_customer_orders
)

Import Status Monitoring

from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook

class DynamoDBImportSensor(AwsBaseSensor):
    """Monitor DynamoDB import job status."""
    
    def __init__(self, import_arn: str, **kwargs):
        super().__init__(**kwargs)
        self.import_arn = import_arn
        self.aws_hook_class = DynamoDBHook
    
    def poke(self, context):
        hook = self.get_hook()
        status, error_code, error_msg = hook.get_import_status(self.import_arn)
        
        if status == 'COMPLETED':
            return True
        elif status == 'FAILED':
            raise AirflowException(f"Import failed: {error_code} - {error_msg}")
        else:
            self.log.info(f"Import status: {status}")
            return False

# Monitor import job
monitor_import = DynamoDBImportSensor(
    task_id='monitor_import_job',
    import_arn='arn:aws:dynamodb:us-west-2:123456789012:table/MyTable/import/01234567890123-abcdefgh',
    poke_interval=60,
    timeout=3600,
    aws_conn_id='aws_default'
)

Import Statements

from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
from airflow.providers.amazon.aws.transfers.s3_to_dynamodb import S3ToDynamoDBOperator
from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator
from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json