Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon DynamoDB provides managed NoSQL database operations with support for batch data operations, import/export functionality, and seamless integration with S3 and other AWS services for data pipeline workflows.
Efficiently write large volumes of data to DynamoDB tables with automatic retry and error handling.
class DynamoDBHook(AwsBaseHook):
"""
Hook for Amazon DynamoDB operations.
Parameters:
- table_keys: list - partition key and sort key for the table
- table_name: str - target DynamoDB table name
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
"""
def __init__(
self,
table_keys: list = None,
table_name: str = None,
aws_conn_id: str = 'aws_default',
region_name: str = None,
**kwargs
): ...
def write_batch_data(self, items: Iterable) -> bool:
"""
Write batch items to DynamoDB table with provisioned throughput capacity.
Parameters:
- items: Iterable - list of DynamoDB items to write
Returns:
bool: True if batch write succeeds
"""
...
def get_import_status(self, import_arn: str) -> tuple[str, str, str]:
"""
Get import status from DynamoDB.
Parameters:
- import_arn: str - Amazon Resource Name (ARN) for the import
Returns:
tuple: Import status, error code, error message
"""
...Monitor DynamoDB table attributes and wait for specific values or conditions.
class DynamoDBValueSensor(AwsBaseSensor):
"""
Wait for an attribute value to be present for an item in a DynamoDB table.
Parameters:
- table_name: str - DynamoDB table name
- partition_key_name: str - DynamoDB partition key name
- partition_key_value: str - DynamoDB partition key value
- attribute_name: str - DynamoDB attribute name to monitor
- attribute_value: Any - expected DynamoDB attribute value
- sort_key_name: str - DynamoDB sort key name (optional)
- sort_key_value: str - DynamoDB sort key value (optional)
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
- poke_interval: int - time in seconds between checks
- timeout: int - maximum time to wait
- mode: str - sensor mode ('poke' or 'reschedule')
Returns:
bool: True when attribute value matches expected value
"""
def __init__(
self,
table_name: str,
partition_key_name: str,
partition_key_value: str,
attribute_name: str,
attribute_value: Any,
sort_key_name: str = None,
sort_key_value: str = None,
aws_conn_id: str = 'aws_default',
region_name: str = None,
**kwargs
): ...Transfer data between DynamoDB and S3 for backup, analytics, and data pipeline operations.
class S3ToDynamoDBOperator(BaseOperator):
"""
Load data from S3 to DynamoDB table.
Parameters:
- s3_bucket: str - S3 bucket name containing source data
- s3_key: str - S3 key path to source data
- dynamodb_table: str - target DynamoDB table name
- table_keys: list - partition key and sort key configuration
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
- input_format: str - input data format ('JSON', 'CSV', 'DYNAMODB_JSON')
- input_compression: str - compression type ('GZIP', 'ZSTD', 'NONE')
Returns:
str: Import job ARN
"""
def __init__(
self,
s3_bucket: str,
s3_key: str,
dynamodb_table: str,
table_keys: list = None,
aws_conn_id: str = 'aws_default',
region_name: str = None,
input_format: str = 'DYNAMODB_JSON',
input_compression: str = 'NONE',
**kwargs
): ...class DynamoDBToS3Operator(BaseOperator):
"""
Export data from DynamoDB table to S3.
Parameters:
- dynamodb_table_name: str - source DynamoDB table name
- s3_bucket_name: str - target S3 bucket name
- s3_key_prefix: str - S3 key prefix for exported data
- dynamodb_scan_kwargs: dict - additional scan parameters
- s3_key_prefix_datetime_format: str - datetime format for S3 key prefix
- process_func: Callable - function to process items before export
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
Returns:
str: S3 export location
"""
def __init__(
self,
dynamodb_table_name: str,
s3_bucket_name: str,
s3_key_prefix: str = '',
dynamodb_scan_kwargs: dict = None,
s3_key_prefix_datetime_format: str = '%Y/%m/%d',
process_func: Callable = None,
aws_conn_id: str = 'aws_default',
region_name: str = None,
**kwargs
): ...Transfer data between Apache Hive and DynamoDB for data warehouse integration.
class HiveToDynamoDBOperator(BaseOperator):
"""
Move data from Hive to DynamoDB.
Parameters:
- sql: str - SQL query to extract data from Hive
- table_name: str - target DynamoDB table name
- table_keys: list - partition key and sort key configuration
- pre_process: Callable - function to preprocess data before insertion
- pre_process_args: tuple - arguments for preprocessing function
- pre_process_kwargs: dict - keyword arguments for preprocessing function
- region_name: str - AWS region name
- schema: str - Hive database schema
- hiveserver2_conn_id: str - Airflow connection for Hive
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
bool: True if transfer succeeds
"""
def __init__(
self,
sql: str,
table_name: str,
table_keys: list = None,
pre_process: Callable = None,
pre_process_args: tuple = None,
pre_process_kwargs: dict = None,
region_name: str = None,
schema: str = 'default',
hiveserver2_conn_id: str = 'hiveserver2_default',
aws_conn_id: str = 'aws_default',
**kwargs
): ...from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
def load_user_data(**context):
"""Load user data into DynamoDB."""
dynamodb_hook = DynamoDBHook(
table_name='users',
table_keys=['user_id'], # Partition key
aws_conn_id='aws_default'
)
# Sample user data
users = [
{
'user_id': '001',
'name': 'Alice Johnson',
'email': 'alice@example.com',
'tier': 'premium',
'signup_date': '2023-01-15'
},
{
'user_id': '002',
'name': 'Bob Smith',
'email': 'bob@example.com',
'tier': 'standard',
'signup_date': '2023-02-20'
}
]
# Write batch data
success = dynamodb_hook.write_batch_data(users)
return f"Batch write success: {success}"
# Use with PythonOperator
load_users = PythonOperator(
task_id='load_user_data',
python_callable=load_user_data
)from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
from airflow.providers.amazon.aws.transfers.s3_to_dynamodb import S3ToDynamoDBOperator
# Wait for processing status to be complete
wait_for_completion = DynamoDBValueSensor(
task_id='wait_for_processing_complete',
table_name='job_status',
partition_key_name='job_id',
partition_key_value='batch_job_001',
attribute_name='status',
attribute_value='COMPLETED',
poke_interval=30,
timeout=3600, # 1 hour timeout
aws_conn_id='aws_default'
)
# Load processed data from S3 to DynamoDB
load_results = S3ToDynamoDBOperator(
task_id='load_processed_results',
s3_bucket='processed-data-bucket',
s3_key='batch-results/job_001/results.json',
dynamodb_table='processed_results',
table_keys=['result_id', 'timestamp'], # Partition and sort keys
input_format='DYNAMODB_JSON',
aws_conn_id='aws_default'
)
wait_for_completion >> load_resultsfrom airflow.providers.amazon.aws.transfers.s3_to_dynamodb import S3ToDynamoDBOperator
# Import customer data from S3 to DynamoDB
import_customers = S3ToDynamoDBOperator(
task_id='import_customer_data',
s3_bucket='data-lake-bucket',
s3_key='customer-data/{{ ds }}/customers.json.gz',
dynamodb_table='customers',
table_keys=['customer_id'],
input_format='DYNAMODB_JSON',
input_compression='GZIP',
aws_conn_id='aws_default'
)from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator
def transform_user_data(item):
"""Transform DynamoDB item for analytics."""
return {
'user_id': item['user_id'],
'name': item['name'],
'tier': item['tier'],
'signup_month': item['signup_date'][:7], # YYYY-MM
'is_premium': item['tier'] == 'premium'
}
# Export user data to S3 for analytics
export_for_analytics = DynamoDBToS3Operator(
task_id='export_users_for_analytics',
dynamodb_table_name='users',
s3_bucket_name='analytics-data-bucket',
s3_key_prefix='user-data/{{ ds }}/',
dynamodb_scan_kwargs={
'ProjectionExpression': 'user_id, #n, email, tier, signup_date',
'ExpressionAttributeNames': {'#n': 'name'},
'FilterExpression': 'attribute_exists(tier)'
},
process_func=transform_user_data,
s3_key_prefix_datetime_format='%Y/%m/%d',
aws_conn_id='aws_default'
)from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
def sync_customer_orders(**context):
"""Synchronize customer data across multiple DynamoDB tables."""
# Initialize hooks for different tables
customers_hook = DynamoDBHook(
table_name='customers',
table_keys=['customer_id'],
aws_conn_id='aws_default'
)
orders_hook = DynamoDBHook(
table_name='orders',
table_keys=['order_id'],
aws_conn_id='aws_default'
)
# Process and sync data between tables
# Implementation would fetch from one table and update another
return "Synchronization completed"
# Multi-table synchronization task
sync_tables = PythonOperator(
task_id='sync_customer_orders',
python_callable=sync_customer_orders
)from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
class DynamoDBImportSensor(AwsBaseSensor):
"""Monitor DynamoDB import job status."""
def __init__(self, import_arn: str, **kwargs):
super().__init__(**kwargs)
self.import_arn = import_arn
self.aws_hook_class = DynamoDBHook
def poke(self, context):
hook = self.get_hook()
status, error_code, error_msg = hook.get_import_status(self.import_arn)
if status == 'COMPLETED':
return True
elif status == 'FAILED':
raise AirflowException(f"Import failed: {error_code} - {error_msg}")
else:
self.log.info(f"Import status: {status}")
return False
# Monitor import job
monitor_import = DynamoDBImportSensor(
task_id='monitor_import_job',
import_arn='arn:aws:dynamodb:us-west-2:123456789012:table/MyTable/import/01234567890123-abcdefgh',
poke_interval=60,
timeout=3600,
aws_conn_id='aws_default'
)from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.sensors.dynamodb import DynamoDBValueSensor
from airflow.providers.amazon.aws.transfers.s3_to_dynamodb import S3ToDynamoDBOperator
from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator
from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperatorInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon